From f2733e87e6d52545e763776916d5690f93b4450e Mon Sep 17 00:00:00 2001 From: Guillermo Calvo Date: Tue, 3 Oct 2023 18:03:17 +0200 Subject: [PATCH 1/4] Add tests --- .../errors/KafkaBatchErrorStrategySpec.groovy | 253 ++++++++++++++++++ 1 file changed, 253 insertions(+) create mode 100644 kafka/src/test/groovy/io/micronaut/configuration/kafka/errors/KafkaBatchErrorStrategySpec.groovy diff --git a/kafka/src/test/groovy/io/micronaut/configuration/kafka/errors/KafkaBatchErrorStrategySpec.groovy b/kafka/src/test/groovy/io/micronaut/configuration/kafka/errors/KafkaBatchErrorStrategySpec.groovy new file mode 100644 index 000000000..aaf7729bc --- /dev/null +++ b/kafka/src/test/groovy/io/micronaut/configuration/kafka/errors/KafkaBatchErrorStrategySpec.groovy @@ -0,0 +1,253 @@ +package io.micronaut.configuration.kafka.errors + +import io.micronaut.configuration.kafka.AbstractEmbeddedServerSpec +import io.micronaut.configuration.kafka.ConsumerRegistry +import io.micronaut.configuration.kafka.annotation.ErrorStrategy +import io.micronaut.configuration.kafka.annotation.KafkaClient +import io.micronaut.configuration.kafka.annotation.KafkaListener +import io.micronaut.configuration.kafka.annotation.Topic +import io.micronaut.configuration.kafka.exceptions.KafkaListenerException +import io.micronaut.configuration.kafka.exceptions.KafkaListenerExceptionHandler +import io.micronaut.context.annotation.Property +import io.micronaut.context.annotation.Requires +import org.apache.kafka.common.errors.RecordDeserializationException + +import java.util.concurrent.atomic.AtomicInteger +import java.util.stream.Collectors + +import static io.micronaut.configuration.kafka.annotation.ErrorStrategyValue.* +import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST + +class KafkaBatchErrorStrategySpec extends AbstractEmbeddedServerSpec { + + static final String BATCH_MODE_RESUME = "batch-mode-resume" + static final String BATCH_MODE_RETRY = "batch-mode-retry" + static final String BATCH_MODE_RETRY_EXP = "batch-mode-retry-exp" + static final String BATCH_MODE_RETRY_DESER = "batch-mode-retry-deser" + static final String BATCH_MODE_RETRY_HANDLE_ALL = "batch-mode-retry-handle-all" + + void "test batch mode with 'resume' error strategy"() { + when: "A consumer throws an exception" + MyClient myClient = context.getBean(MyClient) + myClient.sendBatch(BATCH_MODE_RESUME, ['One', 'Two']) + myClient.sendBatch(BATCH_MODE_RESUME, ['Three', 'Four']) + + ResumeConsumer myConsumer = context.getBean(ResumeConsumer) + context.getBean(ConsumerRegistry).resume(BATCH_MODE_RESUME) + + then: "The batch that threw the exception was skipped and the next batch was processed" + conditions.eventually { + myConsumer.received == ['One/Two', 'Three/Four'] || + myConsumer.received == ['One', 'Two/Three', 'Four'] + } + } + + void "test batch mode with 'retry' error strategy"() { + when: "A consumer throws an exception" + MyClient myClient = context.getBean(MyClient) + myClient.sendBatch(BATCH_MODE_RETRY, ['One', 'Two']) + myClient.sendBatch(BATCH_MODE_RETRY, ['Three', 'Four']) + + RetryConsumer myConsumer = context.getBean(RetryConsumer) + context.getBean(ConsumerRegistry).resume(BATCH_MODE_RETRY) + + then: "The batch that threw the exception was re-consumed" + conditions.eventually { + myConsumer.received == ['One/Two', 'One/Two', 'Three/Four'] + } + + and: "The retry was delivered at least 50ms afterwards" + myConsumer.times[1] - myConsumer.times[0] >= 500 + } + + void "test batch mode with 'retry' error strategy when there are serialization errors"() { + when: "A record cannot be deserialized" + MyClient myClient = context.getBean(MyClient) + myClient.sendBatchOfNumbers(BATCH_MODE_RETRY_DESER, [111, 222]) + myClient.sendBatchOfNumbers(BATCH_MODE_RETRY_DESER, [333]) + myClient.sendBatch(BATCH_MODE_RETRY_DESER, ['Not an integer']) + myClient.sendBatchOfNumbers(BATCH_MODE_RETRY_DESER, [444, 555]) + + RetryDeserConsumer myConsumer = context.getBean(RetryDeserConsumer) + context.getBean(ConsumerRegistry).resume(BATCH_MODE_RETRY_DESER) + + then: "The message that threw the exception was eventually left behind" + conditions.eventually { + myConsumer.received == ['111/222', '333', '444/555'] + } + + and: "The retry error strategy was honored" + myConsumer.exceptions.size() == 2 + myConsumer.exceptions[0].message.startsWith('Error deserializing key/value') + (myConsumer.exceptions[0].cause as RecordDeserializationException).offset == 3 + myConsumer.exceptions[1].message.startsWith('Error deserializing key/value') + (myConsumer.exceptions[1].cause as RecordDeserializationException).offset == 3 + } + + void "test batch mode with 'retry exp' error strategy"() { + when: "A consumer throws an exception" + MyClient myClient = context.getBean(MyClient) + myClient.sendBatch(BATCH_MODE_RETRY_EXP, ['One', 'Two']) + myClient.sendBatch(BATCH_MODE_RETRY_EXP, ['Three', 'Four']) + + RetryExpConsumer myConsumer = context.getBean(RetryExpConsumer) + context.getBean(ConsumerRegistry).resume(BATCH_MODE_RETRY_EXP) + + then: "Batch is consumed eventually" + conditions.eventually { + myConsumer.received == ['One/Two', 'One/Two', 'One/Two', 'One/Two', 'Three/Four'] + } + + and: "Batch was retried with exponential breaks between deliveries" + myConsumer.times[1] - myConsumer.times[0] >= 50 + myConsumer.times[2] - myConsumer.times[1] >= 100 + myConsumer.times[3] - myConsumer.times[2] >= 200 + } + + void "test batch mode with 'retry' error strategy + handle all exceptions"() { + when: "A consumer throws an exception" + MyClient myClient = context.getBean(MyClient) + myClient.sendBatch(BATCH_MODE_RETRY_HANDLE_ALL, ['One', 'Two']) + myClient.sendBatch(BATCH_MODE_RETRY_HANDLE_ALL, ['Three', 'Four']) + myClient.sendBatch(BATCH_MODE_RETRY_HANDLE_ALL, ['Five', 'Six']) + myClient.sendBatch(BATCH_MODE_RETRY_HANDLE_ALL, ['Seven', 'Eight']) + + RetryHandleAllConsumer myConsumer = context.getBean(RetryHandleAllConsumer) + context.getBean(ConsumerRegistry).resume(BATCH_MODE_RETRY_HANDLE_ALL) + + then: "Batches were retried and consumed eventually" + conditions.eventually { + myConsumer.received == ['One/Two', 'Three/Four', 'Three/Four', 'Five/Six', 'Five/Six', 'Five/Six', 'Seven/Eight'] + } + + and: "All exceptions were handled" + myConsumer.exceptions.size() == 4 + myConsumer.exceptions[0].message == "[Three, Four] #2" + myConsumer.exceptions[0].consumerRecords.orElseThrow() + myConsumer.exceptions[1].message == "[Five, Six] #4" + myConsumer.exceptions[1].consumerRecords.orElseThrow() + myConsumer.exceptions[2].message == "[Five, Six] #5" + myConsumer.exceptions[2].consumerRecords.orElseThrow() + myConsumer.exceptions[3].message == "[Five, Six] #6" + myConsumer.exceptions[3].consumerRecords.orElseThrow() + } + + @Requires(property = 'spec.name', value = 'KafkaBatchErrorStrategySpec') + @KafkaClient(batch = true) + static interface MyClient { + void sendBatch(@Topic String topic, List messages) + + void sendBatchOfNumbers(@Topic String topic, List numbers) + } + + static abstract class AbstractConsumer { + AtomicInteger count = new AtomicInteger(0) + List received = [] + List exceptions = [] + + String concatenate(List messages) { + return messages.stream().map(Object::toString).collect(Collectors.joining('/')) + } + } + + @Requires(property = 'spec.name', value = 'KafkaBatchErrorStrategySpec') + @KafkaListener( + clientId = BATCH_MODE_RESUME, + batch = true, + autoStartup = false, + offsetReset = EARLIEST, + errorStrategy = @ErrorStrategy(value = RESUME_AT_NEXT_RECORD), + properties = @Property(name = 'max.poll.records', value = '2')) + static class ResumeConsumer extends AbstractConsumer { + @Topic(BATCH_MODE_RESUME) + void receiveBatch(List messages) { + received << concatenate(messages) + if (count.getAndIncrement() == 0) throw new RuntimeException("Won't handle first batch") + } + } + + @Requires(property = 'spec.name', value = 'KafkaBatchErrorStrategySpec') + @KafkaListener( + clientId = BATCH_MODE_RETRY, + batch = true, + autoStartup = false, + offsetReset = EARLIEST, + errorStrategy = @ErrorStrategy(value = RETRY_ON_ERROR, retryDelay = '500ms'), + properties = @Property(name = 'max.poll.records', value = '2')) + static class RetryConsumer extends AbstractConsumer { + List times = [] + + @Topic(BATCH_MODE_RETRY) + void handleBatch(List messages) { + received << concatenate(messages) + times << System.currentTimeMillis() + if (count.getAndIncrement() == 0) { + throw new RuntimeException("Won't handle first batch") + } + } + } + + @Requires(property = 'spec.name', value = 'KafkaBatchErrorStrategySpec') + @KafkaListener( + clientId = BATCH_MODE_RETRY_DESER, + batch = true, + autoStartup = false, + offsetReset = EARLIEST, + errorStrategy = @ErrorStrategy(value = RETRY_ON_ERROR, handleAllExceptions = true), + properties = @Property(name = 'max.poll.records', value = '2')) + static class RetryDeserConsumer extends AbstractConsumer implements KafkaListenerExceptionHandler { + + @Topic(BATCH_MODE_RETRY_DESER) + void handleBatch(List numbers) { + received << concatenate(numbers) + } + + @Override + void handle(KafkaListenerException exception) { + exceptions << exception + } + } + + @Requires(property = 'spec.name', value = 'KafkaBatchErrorStrategySpec') + @KafkaListener( + clientId = BATCH_MODE_RETRY_EXP, + batch = true, + autoStartup = false, + offsetReset = EARLIEST, + errorStrategy = @ErrorStrategy(value = RETRY_EXPONENTIALLY_ON_ERROR, retryCount = 3, retryDelay = '50ms'), + properties = @Property(name = 'max.poll.records', value = '2')) + static class RetryExpConsumer extends AbstractConsumer { + List times = [] + + @Topic(BATCH_MODE_RETRY_EXP) + void handleBatch(List messages) { + received << concatenate(messages) + times << System.currentTimeMillis() + if (count.getAndIncrement() < 4) { + throw new RuntimeException("Won't handle first three delivery attempts") + } + } + } + + @Requires(property = 'spec.name', value = 'KafkaBatchErrorStrategySpec') + @KafkaListener( + clientId = BATCH_MODE_RETRY_HANDLE_ALL, + batch = true, + autoStartup = false, + offsetReset = EARLIEST, + errorStrategy = @ErrorStrategy(value = RETRY_ON_ERROR, retryCount = 2, handleAllExceptions = true), + properties = @Property(name = "max.poll.records", value = "2")) + static class RetryHandleAllConsumer extends AbstractConsumer implements KafkaListenerExceptionHandler { + + @Topic(BATCH_MODE_RETRY_HANDLE_ALL) + void receiveBatch(List messages) { + received << concatenate(messages) + if (count.getAndIncrement() == 1 || messages.contains('Five')) throw new RuntimeException("${messages} #${count}") + } + + @Override + void handle(KafkaListenerException exception) { + exceptions << exception + } + } +} From df05b9f14f6e4530df750add705e6113c46966b8 Mon Sep 17 00:00:00 2001 From: Guillermo Calvo Date: Mon, 2 Oct 2023 17:05:08 +0200 Subject: [PATCH 2/4] Add support for batch error strategy --- .../exceptions/KafkaListenerException.java | 56 +++- .../kafka/processor/ConsumerState.java | 290 ++++-------------- .../kafka/processor/ConsumerStateBatch.java | 166 ++++++++++ .../kafka/processor/ConsumerStateSingle.java | 196 ++++++++++++ .../processor/KafkaConsumerProcessor.java | 17 +- .../errors/KafkaBatchErrorStrategySpec.groovy | 12 +- 6 files changed, 477 insertions(+), 260 deletions(-) create mode 100644 kafka/src/main/java/io/micronaut/configuration/kafka/processor/ConsumerStateBatch.java create mode 100644 kafka/src/main/java/io/micronaut/configuration/kafka/processor/ConsumerStateSingle.java diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/exceptions/KafkaListenerException.java b/kafka/src/main/java/io/micronaut/configuration/kafka/exceptions/KafkaListenerException.java index bda4035de..81ecf1823 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/exceptions/KafkaListenerException.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/exceptions/KafkaListenerException.java @@ -15,9 +15,11 @@ */ package io.micronaut.configuration.kafka.exceptions; +import io.micronaut.core.annotation.Nullable; import io.micronaut.messaging.exceptions.MessageListenerException; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; import java.util.Optional; /** @@ -29,9 +31,10 @@ @SuppressWarnings("WeakerAccess") public class KafkaListenerException extends MessageListenerException { - private final Object listener; - private final Consumer kafkaConsumer; - private final ConsumerRecord consumerRecord; + private final transient Object listener; + private final transient Consumer kafkaConsumer; + private final transient ConsumerRecords consumerRecords; + private final transient ConsumerRecord consumerRecord; /** * Creates a new exception. @@ -42,10 +45,7 @@ public class KafkaListenerException extends MessageListenerException { * @param consumerRecord The consumer record */ public KafkaListenerException(String message, Object listener, Consumer kafkaConsumer, ConsumerRecord consumerRecord) { - super(message); - this.listener = listener; - this.kafkaConsumer = kafkaConsumer; - this.consumerRecord = consumerRecord; + this(message, null, listener, kafkaConsumer, consumerRecord); } /** @@ -58,12 +58,9 @@ public KafkaListenerException(String message, Object listener, Consumer ka * @param consumerRecord The consumer record */ public KafkaListenerException(String message, Throwable cause, Object listener, Consumer kafkaConsumer, ConsumerRecord consumerRecord) { - super(message, cause); - this.listener = listener; - this.kafkaConsumer = kafkaConsumer; - this.consumerRecord = consumerRecord; + this(message, cause, listener, kafkaConsumer, null, consumerRecord); } - + /** * Creates a new exception. * @@ -73,9 +70,31 @@ public KafkaListenerException(String message, Throwable cause, Object listener, * @param consumerRecord The consumer record */ public KafkaListenerException(Throwable cause, Object listener, Consumer kafkaConsumer, ConsumerRecord consumerRecord) { - super(cause.getMessage(), cause); + this(cause.getMessage(), cause, listener, kafkaConsumer, consumerRecord); + } + + /** + * Creates a new exception. + * + * @param message The message + * @param cause The cause + * @param listener The listener + * @param kafkaConsumer The consumer + * @param consumerRecords The batch of consumer records + * @param consumerRecord The consumer record + */ + public KafkaListenerException( + String message, + Throwable cause, + Object listener, + Consumer kafkaConsumer, + @Nullable ConsumerRecords consumerRecords, + @Nullable ConsumerRecord consumerRecord + ) { + super(message, cause); this.listener = listener; this.kafkaConsumer = kafkaConsumer; + this.consumerRecords = consumerRecords; this.consumerRecord = consumerRecord; } @@ -89,6 +108,7 @@ public Object getKafkaListener() { /** * @return The consumer that produced the error */ + @SuppressWarnings("java:S1452") // Remove usage of generic wildcard type public Consumer getKafkaConsumer() { return kafkaConsumer; } @@ -96,7 +116,17 @@ public Object getKafkaListener() { /** * @return The consumer record that was being processed that caused the error */ + @SuppressWarnings("java:S1452") // Remove usage of generic wildcard type public Optional> getConsumerRecord() { return Optional.ofNullable(consumerRecord); } + + /** + * @return The batch of consumer records that was being processed that caused the error + * @since 5.3 + */ + @SuppressWarnings("java:S1452") // Remove usage of generic wildcard type + public Optional> getConsumerRecords() { + return Optional.ofNullable(consumerRecords); + } } diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/ConsumerState.java b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/ConsumerState.java index 976038b9c..676793727 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/ConsumerState.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/ConsumerState.java @@ -15,17 +15,12 @@ */ package io.micronaut.configuration.kafka.processor; -import io.micronaut.configuration.kafka.KafkaAcknowledgement; import io.micronaut.configuration.kafka.KafkaMessage; -import io.micronaut.configuration.kafka.annotation.ErrorStrategyValue; import io.micronaut.configuration.kafka.annotation.OffsetStrategy; import io.micronaut.configuration.kafka.exceptions.KafkaListenerException; -import io.micronaut.configuration.kafka.seek.KafkaSeekOperations; -import io.micronaut.configuration.kafka.seek.KafkaSeeker; import io.micronaut.core.annotation.Internal; import io.micronaut.core.annotation.NonNull; import io.micronaut.core.annotation.Nullable; -import io.micronaut.core.async.publisher.Publishers; import io.micronaut.core.type.Argument; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.producer.Callback; @@ -34,7 +29,6 @@ import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ProducerFencedException; -import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.header.internals.RecordHeader; import org.reactivestreams.Publisher; @@ -56,28 +50,27 @@ * @since 5.2 */ @Internal -final class ConsumerState { +abstract class ConsumerState { - private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerProcessor.class); // NOSONAR + protected static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerProcessor.class); // NOSONAR + protected final KafkaConsumerProcessor kafkaConsumerProcessor; + protected final Object consumerBean; + @Nullable + protected final Map topicPartitionRetries; + protected final Map, Object> boundArguments; + protected boolean failed; final ConsumerInfo info; final Consumer kafkaConsumer; final Set subscriptions; Set assignments; - - private final KafkaConsumerProcessor kafkaConsumerProcessor; - private final Object consumerBean; private Set pausedTopicPartitions; private Set pauseRequests; - @Nullable - private Map partitionRetries; private boolean autoPaused; - private final Map, Object> boundArguments; private boolean pollingStarted; - private boolean failed; private volatile ConsumerCloseState closedState; - ConsumerState( + protected ConsumerState( KafkaConsumerProcessor kafkaConsumerProcessor, ConsumerInfo info, Consumer consumer, @@ -92,8 +85,16 @@ final class ConsumerState { this.boundArguments = new HashMap<>(2); Optional.ofNullable(info.consumerArg).ifPresent(argument -> boundArguments.put(argument, kafkaConsumer)); this.closedState = ConsumerCloseState.NOT_STARTED; + this.topicPartitionRetries = this.info.errorStrategy.isRetry() ? new HashMap<>() : null; } + protected abstract ConsumerRecords pollRecords(@Nullable Map currentOffsets); // NOSONAR + + protected abstract void processRecords(ConsumerRecords consumerRecords, Map currentOffsets); // NOSONAR + + @Nullable + protected abstract Map getCurrentOffsets(); + void pause() { pause(assignments); } @@ -171,7 +172,7 @@ private void refreshAssignmentsPollAndProcessRecords() { } throw e; } catch (Exception e) { - handleException(e, null); + handleException(e, null, null); } } @@ -189,7 +190,17 @@ private void refreshAssignments() { private void pollAndProcessRecords() { failed = true; - final ConsumerRecords consumerRecords = pollRecords(); + // We need to retrieve current offsets in case we need to retry the current record or batch + final Map currentOffsets = getCurrentOffsets(); + // Poll records + pauseTopicPartitions(); + final ConsumerRecords consumerRecords = pollRecords(currentOffsets); + closedState = ConsumerCloseState.POLLING; + if (!pollingStarted) { + pollingStarted = true; + kafkaConsumerProcessor.publishStartedPollingEvent(kafkaConsumer); + } + resumeTopicPartitions(); if (consumerRecords == null || consumerRecords.isEmpty()) { return; // No consumer records to process } @@ -198,11 +209,7 @@ private void pollAndProcessRecords() { Argument lastArgument = info.method.getArguments()[info.method.getArguments().length - 1]; boundArguments.put(lastArgument, null); } - if (info.isBatch) { - processAsBatch(consumerRecords); - } else { - processIndividually(consumerRecords); - } + processRecords(consumerRecords, currentOffsets); if (failed) { return; } @@ -210,25 +217,13 @@ private void pollAndProcessRecords() { try { kafkaConsumer.commitSync(); } catch (CommitFailedException e) { - handleException(e, null); + handleException(e, consumerRecords, null); } } else if (info.offsetStrategy == OffsetStrategy.ASYNC) { kafkaConsumer.commitAsync(resolveCommitCallback()); } } - private ConsumerRecords pollRecords() { - pauseTopicPartitions(); - final ConsumerRecords consumerRecords = poll(); - closedState = ConsumerCloseState.POLLING; - if (!pollingStarted) { - pollingStarted = true; - kafkaConsumerProcessor.publishStartedPollingEvent(kafkaConsumer); - } - resumeTopicPartitions(); - return consumerRecords; - } - private synchronized void pauseTopicPartitions() { if (pauseRequests == null || pauseRequests.isEmpty()) { return; @@ -265,133 +260,15 @@ private synchronized void resumeTopicPartitions() { } } - private ConsumerRecords poll() { - // Deserialization errors can happen while polling - - // In batch mode, propagate any errors - if (info.isBatch) { - return kafkaConsumer.poll(info.pollTimeout); - } - - // Otherwise, try to honor the configured error strategy - try { - return kafkaConsumer.poll(info.pollTimeout); - } catch (RecordDeserializationException ex) { - LOG.trace("Kafka consumer [{}] failed to deserialize value while polling", info.logMethod, ex); - final TopicPartition tp = ex.topicPartition(); - // By default, seek past the record to continue consumption - kafkaConsumer.seek(tp, ex.offset() + 1); - // The error strategy and the exception handler can still decide what to do about this record - resolveWithErrorStrategy(new ConsumerRecord<>(tp.topic(), tp.partition(), ex.offset(), null, null), ex); - // By now, it's been decided whether this record should be retried and the exception may have been handled - return null; - } - } - - private void processAsBatch(final ConsumerRecords consumerRecords) { - final Object methodResult = kafkaConsumerProcessor.bindAsBatch(info.method, boundArguments, consumerRecords).invoke(consumerBean); - normalizeResult(methodResult).ifPresent(result -> { - final Flux resultFlux = toFlux(result); - final Iterator> iterator = consumerRecords.iterator(); - final java.util.function.Consumer consumeNext = o -> { - if (iterator.hasNext()) { - final ConsumerRecord consumerRecord = iterator.next(); - handleResultFlux(consumerRecord, Flux.just(o), true, consumerRecords); - } - }; - if (Publishers.isConvertibleToPublisher(result) && !info.isBlocking) { - resultFlux.subscribe(consumeNext); - } else { - Optional.ofNullable(resultFlux.collectList().block()).stream().flatMap(List::stream).forEach(consumeNext); - } - }); - failed = false; - } - - @Nullable - private static Optional normalizeResult(@Nullable Object result) { - return Optional.ofNullable(result).map(x -> x.getClass().isArray() ? Arrays.asList((Object[]) x) : x); - } - - private Flux toFlux(Object result) { - if (result instanceof Iterable iterable) { - return Flux.fromIterable(iterable); - } - if (Publishers.isConvertibleToPublisher(result)) { - return kafkaConsumerProcessor.convertPublisher(result); - } - return Flux.just(result); - } - - private void processIndividually(final ConsumerRecords consumerRecords) { - final Map currentOffsets = info.trackPartitions ? new HashMap<>() : null; - final Iterator> iterator = consumerRecords.iterator(); - while (iterator.hasNext()) { - final ConsumerRecord consumerRecord = iterator.next(); - - LOG.trace("Kafka consumer [{}] received record: {}", info.logMethod, consumerRecord); - - if (info.trackPartitions) { - final TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition()); - final OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(consumerRecord.offset() + 1, null); - currentOffsets.put(topicPartition, offsetAndMetadata); - } - - final KafkaSeekOperations seek = Optional.ofNullable(info.seekArg).map(x -> KafkaSeekOperations.newInstance()).orElse(null); - Optional.ofNullable(info.seekArg).ifPresent(argument -> boundArguments.put(argument, seek)); - Optional.ofNullable(info.ackArg).ifPresent(argument -> boundArguments.put(argument, (KafkaAcknowledgement) () -> kafkaConsumer.commitSync(currentOffsets))); - - try { - process(consumerRecord, consumerRecords); - } catch (Exception e) { - if (resolveWithErrorStrategy(consumerRecord, e)) { - resetTheFollowingPartitions(consumerRecord, iterator); - failed = true; - return; - } - } - - if (info.offsetStrategy == OffsetStrategy.SYNC_PER_RECORD) { - commitSync(consumerRecord, currentOffsets); - } else if (info.offsetStrategy == OffsetStrategy.ASYNC_PER_RECORD) { - kafkaConsumer.commitAsync(currentOffsets, resolveCommitCallback()); - } - - if (seek != null) { - // Performs seek operations that were deferred by the user - final KafkaSeeker seeker = KafkaSeeker.newInstance(kafkaConsumer); - seek.forEach(seeker::perform); - } - } - failed = false; - } - - private void process(ConsumerRecord consumerRecord, ConsumerRecords consumerRecords) { - final Object result = kafkaConsumerProcessor.bind(info.method, boundArguments, consumerRecord).invoke(consumerBean); - if (result != null) { - final boolean isPublisher = Publishers.isConvertibleToPublisher(result); - final Flux publisher = isPublisher ? kafkaConsumerProcessor.convertPublisher(result) : Flux.just(result); - handleResultFlux(consumerRecord, publisher, isPublisher || info.isBlocking, consumerRecords); - } - } - - private void commitSync(ConsumerRecord consumerRecord, Map currentOffsets) { - try { - kafkaConsumer.commitSync(currentOffsets); - } catch (CommitFailedException e) { - handleException(e, consumerRecord); - } - } - - private void handleResultFlux( + protected void handleResultFlux( + ConsumerRecords consumerRecords, ConsumerRecord consumerRecord, Flux publisher, - boolean isBlocking, - ConsumerRecords consumerRecords + boolean isBlocking ) { final Flux recordMetadataProducer = publisher .flatMap(value -> sendToDestination(value, consumerRecord, consumerRecords)) - .onErrorResume(error -> handleSendToError(error, consumerRecord)); + .onErrorResume(error -> handleSendToError(error, consumerRecords, consumerRecord)); if (isBlocking) { List listRecords = recordMetadataProducer.collectList().block(); @@ -504,15 +381,15 @@ private void sendOffsetsToTransaction(Producer kafkaProducer, Map handleSendToError(Throwable error, ConsumerRecord consumerRecord) { - handleException("Error occurred processing record [" + consumerRecord + "] with Kafka reactive consumer [" + info.method + "]: " + error.getMessage(), error, consumerRecord); + private Publisher handleSendToError(Throwable error, ConsumerRecords consumerRecords, ConsumerRecord consumerRecord) { + handleException("Error occurred processing record [" + consumerRecord + "] with Kafka reactive consumer [" + info.method + "]: " + error.getMessage(), error, consumerRecords, consumerRecord); if (!info.shouldRedeliver) { return Flux.empty(); } return redeliver(consumerRecord) - .doOnError(ex -> handleException("Redelivery failed for record [" + consumerRecord + "] with Kafka reactive consumer [" + info.method + "]: " + error.getMessage(), ex, consumerRecord)); + .doOnError(ex -> handleException("Redelivery failed for record [" + consumerRecord + "] with Kafka reactive consumer [" + info.method + "]: " + error.getMessage(), ex, consumerRecords, consumerRecord)); } @SuppressWarnings({ "rawtypes", "unchecked" }) @@ -538,59 +415,26 @@ private Mono redeliver(ConsumerRecord consumerRecord) { return Mono.create(emitter -> kafkaProducer.send(producerRecord, new MonoCallback(emitter))); } - private boolean resolveWithErrorStrategy(ConsumerRecord consumerRecord, Throwable e) { - - ErrorStrategyValue currentErrorStrategy = info.errorStrategy; - - if (currentErrorStrategy.isRetry() && !info.exceptionTypes.isEmpty() && - info.exceptionTypes.stream().noneMatch(error -> error.equals(e.getClass()))) { - if (partitionRetries != null) { - partitionRetries.remove(consumerRecord.partition()); - } - // Skip the failing record - currentErrorStrategy = ErrorStrategyValue.RESUME_AT_NEXT_RECORD; + protected void delayRetry(int currentRetryCount, Set partitions) { + // Decide how long should we wait to retry this batch again + final Duration retryDelay = info.errorStrategy.computeRetryDelay(info.retryDelay, + currentRetryCount); + if (retryDelay != null) { + pause(partitions); + kafkaConsumerProcessor.scheduleTask(retryDelay, () -> resume(partitions)); } + } - if (currentErrorStrategy.isRetry() && info.retryCount != 0) { - - final PartitionRetryState retryState = getPartitionRetryState(consumerRecord); - - if (info.retryCount >= retryState.currentRetryCount) { - if (info.shouldHandleAllExceptions) { - handleException(e, consumerRecord); - } - - final TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition()); - kafkaConsumer.seek(topicPartition, consumerRecord.offset()); - - final Duration retryDelay = currentErrorStrategy.computeRetryDelay(info.retryDelay, retryState.currentRetryCount); - if (retryDelay != null) { - // in the stop on error strategy, pause the consumer and resume after the retryDelay duration - final Set paused = Collections.singleton(topicPartition); - pause(paused); - kafkaConsumerProcessor.scheduleTask(retryDelay, () -> resume(paused)); - } - return true; - } else { - partitionRetries.remove(consumerRecord.partition()); - // Skip the failing record - currentErrorStrategy = ErrorStrategyValue.RESUME_AT_NEXT_RECORD; - } - } - - handleException(e, consumerRecord); - - return currentErrorStrategy != ErrorStrategyValue.RESUME_AT_NEXT_RECORD; + protected boolean shouldRetryException(Throwable e) { + return info.exceptionTypes.isEmpty() || + info.exceptionTypes.stream().anyMatch(e.getClass()::equals); } - private PartitionRetryState getPartitionRetryState(ConsumerRecord consumerRecord) { - final PartitionRetryState retryState; - if (partitionRetries == null) { - partitionRetries = new HashMap<>(); - } - retryState = partitionRetries.computeIfAbsent(consumerRecord.partition(), x -> new PartitionRetryState()); - if (retryState.currentRetryOffset != consumerRecord.offset()) { - retryState.currentRetryOffset = consumerRecord.offset(); + protected PartitionRetryState getPartitionRetryState(TopicPartition tp, long currentOffset) { + final PartitionRetryState retryState = topicPartitionRetries + .computeIfAbsent(tp, x -> new PartitionRetryState()); + if (retryState.currentRetryOffset != currentOffset) { + retryState.currentRetryOffset = currentOffset; retryState.currentRetryCount = 1; } else { retryState.currentRetryCount++; @@ -598,30 +442,14 @@ private PartitionRetryState getPartitionRetryState(ConsumerRecord consumer return retryState; } - private void resetTheFollowingPartitions( - ConsumerRecord errorConsumerRecord, - Iterator> iterator - ) { - Set processedPartition = new HashSet<>(); - processedPartition.add(errorConsumerRecord.partition()); - while (iterator.hasNext()) { - ConsumerRecord consumerRecord = iterator.next(); - if (!processedPartition.add(consumerRecord.partition())) { - continue; - } - TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition()); - kafkaConsumer.seek(topicPartition, consumerRecord.offset()); - } - } - - private void handleException(Throwable e, ConsumerRecord consumerRecord) { - kafkaConsumerProcessor.handleException(consumerBean, - new KafkaListenerException(e, consumerBean, kafkaConsumer, consumerRecord)); + protected void handleException(Throwable e, @Nullable ConsumerRecords consumerRecords, + @Nullable ConsumerRecord consumerRecord) { + handleException(e.getMessage(), e, consumerRecords, consumerRecord); } - private void handleException(String message, Throwable e, ConsumerRecord consumerRecord) { + private void handleException(String message, Throwable e, @Nullable ConsumerRecords consumerRecords, @Nullable ConsumerRecord consumerRecord) { kafkaConsumerProcessor.handleException(consumerBean, - new KafkaListenerException(message, e, consumerBean, kafkaConsumer, consumerRecord)); + new KafkaListenerException(message, e, consumerBean, kafkaConsumer, consumerRecords, consumerRecord)); } private OffsetCommitCallback resolveCommitCallback() { diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/ConsumerStateBatch.java b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/ConsumerStateBatch.java new file mode 100644 index 000000000..0f5401740 --- /dev/null +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/ConsumerStateBatch.java @@ -0,0 +1,166 @@ +/* + * Copyright 2017-2020 original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micronaut.configuration.kafka.processor; + +import io.micronaut.configuration.kafka.annotation.ErrorStrategyValue; +import io.micronaut.core.annotation.Internal; +import io.micronaut.core.annotation.Nullable; +import io.micronaut.core.async.publisher.Publishers; +import io.micronaut.core.bind.DefaultExecutableBinder; +import io.micronaut.core.bind.ExecutableBinder; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.RecordDeserializationException; +import reactor.core.publisher.Flux; +import reactor.util.function.Tuple2; + +import java.util.Arrays; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static java.util.function.Function.identity; + +/** + * The internal state of the consumer in batch mode. + * + * @author Guillermo Calvo + * @since 5.3 + */ +@Internal +final class ConsumerStateBatch extends ConsumerState { + + ConsumerStateBatch(KafkaConsumerProcessor kafkaConsumerProcessor, ConsumerInfo info, Consumer consumer, Object consumerBean) { + super(kafkaConsumerProcessor, info, consumer, consumerBean); + } + + @Override + @Nullable + protected Map getCurrentOffsets() { + return info.errorStrategy.isRetry() ? + kafkaConsumer.assignment().stream().collect(Collectors.toMap(identity(), this::getCurrentOffset)) : null; + } + + @Override + protected ConsumerRecords pollRecords( + @Nullable Map currentOffsets) { + // Deserialization errors can happen while polling + try { + return kafkaConsumer.poll(info.pollTimeout); + } catch (RecordDeserializationException ex) { + // Try to honor the configured error strategy + LOG.trace("Kafka consumer [{}] failed to deserialize value while polling", info.logMethod, ex); + // By default, seek past the record to continue consumption + kafkaConsumer.seek(ex.topicPartition(), ex.offset() + 1); + // The error strategy and the exception handler can still decide what to do about this record + resolveWithErrorStrategy(null, currentOffsets, ex); + // By now, it's been decided whether this record should be retried and the exception may have been handled + return null; + } + } + + @Override + protected void processRecords(ConsumerRecords consumerRecords, @Nullable Map currentOffsets) { + try { + final ExecutableBinder> batchBinder = new DefaultExecutableBinder<>(boundArguments); + final Object result = batchBinder.bind(info.method, kafkaConsumerProcessor.getBatchBinderRegistry(), consumerRecords).invoke(consumerBean); + handleResult(normalizeResult(result), consumerRecords); + failed = false; + } catch (Exception e) { + failed = resolveWithErrorStrategy(consumerRecords, currentOffsets, e); + } + } + + @Nullable + private static Object normalizeResult(@Nullable Object result) { + if (result != null && result.getClass().isArray()) { + return Arrays.asList((Object[]) result); + } + return result; + } + + private void handleResult(Object result, ConsumerRecords consumerRecords) { + if (result != null) { + final boolean isPublisher = Publishers.isConvertibleToPublisher(result); + final boolean isBlocking = info.isBlocking || !isPublisher; + // Flux of tuples (consumer record / result) + final Flux, ?>> resultRecordFlux; + // Flux of consumer records + final Flux> recordFlux = Flux.fromIterable(consumerRecords); + // Flux of results + final Flux resultFlux; + if (result instanceof Iterable iterable) { + resultFlux = Flux.fromIterable(iterable); + } else if (isPublisher) { + resultFlux = kafkaConsumerProcessor.convertPublisher(result); + } else { + resultFlux = Flux.just(result); + } + // Zip consumer record flux with result flux + resultRecordFlux = recordFlux.zipWith(resultFlux) + .doOnNext(x -> handleResultFlux(consumerRecords, x.getT1(), Flux.just(x.getT2()), isBlocking)); + // Block on the zipped flux or subscribe if non-blocking + if (isBlocking) { + resultRecordFlux.blockLast(); + } else { + resultRecordFlux.subscribe(); + } + } + } + + @SuppressWarnings("java:S1874") // ErrorStrategyValue.NONE is deprecated + private boolean resolveWithErrorStrategy(@Nullable ConsumerRecords consumerRecords, + Map currentOffsets, Throwable e) { + if (info.errorStrategy.isRetry()) { + final Set partitions = consumerRecords != null ? consumerRecords.partitions() : currentOffsets.keySet(); + if (shouldRetryException(e) && info.retryCount > 0) { + // Check how many retries so far + final int currentRetryCount = getCurrentRetryCount(partitions, currentOffsets); + if (info.retryCount >= currentRetryCount) { + // We will retry this batch again next time + if (info.shouldHandleAllExceptions) { + handleException(e, consumerRecords, null); + } + // Move back to the previous positions + partitions.forEach(tp -> kafkaConsumer.seek(tp, currentOffsets.get(tp).offset())); + // Decide how long should we wait to retry this batch again + delayRetry(currentRetryCount, partitions); + return true; + } + } + // We will NOT retry this batch anymore + partitions.forEach(topicPartitionRetries::remove); + } + // Skip the failing batch of records + handleException(e, consumerRecords, null); + return info.errorStrategy == ErrorStrategyValue.NONE; + } + + private int getCurrentRetryCount(Set partitions, + @Nullable Map currentOffsets) { + return partitions.stream() + .map(tp -> getPartitionRetryState(tp, currentOffsets.get(tp).offset())) + .mapToInt(x -> x.currentRetryCount) + .max().orElse(info.retryCount); + } + + private OffsetAndMetadata getCurrentOffset(TopicPartition tp) { + return new OffsetAndMetadata(kafkaConsumer.position(tp), null); + } +} diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/ConsumerStateSingle.java b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/ConsumerStateSingle.java new file mode 100644 index 000000000..feb1fc37f --- /dev/null +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/ConsumerStateSingle.java @@ -0,0 +1,196 @@ +/* + * Copyright 2017-2020 original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micronaut.configuration.kafka.processor; + +import io.micronaut.configuration.kafka.KafkaAcknowledgement; +import io.micronaut.configuration.kafka.annotation.ErrorStrategyValue; +import io.micronaut.configuration.kafka.annotation.OffsetStrategy; +import io.micronaut.configuration.kafka.seek.KafkaSeekOperations; +import io.micronaut.configuration.kafka.seek.KafkaSeeker; +import io.micronaut.core.annotation.Internal; +import io.micronaut.core.annotation.Nullable; +import io.micronaut.core.async.publisher.Publishers; +import io.micronaut.core.bind.DefaultExecutableBinder; +import io.micronaut.core.bind.ExecutableBinder; +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.RecordDeserializationException; +import reactor.core.publisher.Flux; + +import java.util.*; + +/** + * The internal state of the consumer in single mode. + * + * @author Guillermo Calvo + * @since 5.3 + */ +@Internal +final class ConsumerStateSingle extends ConsumerState { + + ConsumerStateSingle(KafkaConsumerProcessor kafkaConsumerProcessor, ConsumerInfo info, Consumer consumer, Object consumerBean) { + super(kafkaConsumerProcessor, info, consumer, consumerBean); + } + + @Override + @Nullable + protected Map getCurrentOffsets() { + return info.trackPartitions ? new HashMap<>() : null; + } + + @Override + protected ConsumerRecords pollRecords(@Nullable Map currentOffsets) { + // Deserialization errors can happen while polling + try { + return kafkaConsumer.poll(info.pollTimeout); + } catch (RecordDeserializationException ex) { + // Try to honor the configured error strategy + LOG.trace("Kafka consumer [{}] failed to deserialize value while polling", info.logMethod, ex); + // By default, seek past the record to continue consumption + kafkaConsumer.seek(ex.topicPartition(), ex.offset() + 1); + // The error strategy and the exception handler can still decide what to do about this record + resolveWithErrorStrategy(null, makeConsumerRecord(ex), ex); + // By now, it's been decided whether this record should be retried and the exception may have been handled + return null; + } + } + + @Override + protected void processRecords(ConsumerRecords consumerRecords, + Map currentOffsets) { + final Iterator> iterator = consumerRecords.iterator(); + while (iterator.hasNext()) { + final ConsumerRecord consumerRecord = iterator.next(); + + LOG.trace("Kafka consumer [{}] received record: {}", info.logMethod, consumerRecord); + + if (info.trackPartitions) { + final TopicPartition topicPartition = getTopicPartition(consumerRecord); + final OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(consumerRecord.offset() + 1, null); + currentOffsets.put(topicPartition, offsetAndMetadata); + } + + final KafkaSeekOperations seek = Optional.ofNullable(info.seekArg).map(x -> KafkaSeekOperations.newInstance()).orElse(null); + Optional.ofNullable(info.seekArg).ifPresent(argument -> boundArguments.put(argument, seek)); + Optional.ofNullable(info.ackArg).ifPresent(argument -> boundArguments.put(argument, (KafkaAcknowledgement) () -> kafkaConsumer.commitSync(currentOffsets))); + + try { + process(consumerRecord, consumerRecords); + } catch (Exception e) { + if (resolveWithErrorStrategy(consumerRecords, consumerRecord, e)) { + resetTheFollowingPartitions(consumerRecord, iterator); + failed = true; + return; + } + } + + if (info.offsetStrategy == OffsetStrategy.SYNC_PER_RECORD) { + commitSync(consumerRecords, consumerRecord, currentOffsets); + } else if (info.offsetStrategy == OffsetStrategy.ASYNC_PER_RECORD) { + kafkaConsumer.commitAsync(currentOffsets, this::resolveCommitCallback); + } + + if (seek != null) { + // Performs seek operations that were deferred by the user + final KafkaSeeker seeker = KafkaSeeker.newInstance(kafkaConsumer); + seek.forEach(seeker::perform); + } + } + failed = false; + } + + private void process(ConsumerRecord consumerRecord, + ConsumerRecords consumerRecords) { + final ExecutableBinder> executableBinder = new DefaultExecutableBinder<>(boundArguments); + final Object result = executableBinder.bind(info.method, kafkaConsumerProcessor.getBinderRegistry(), consumerRecord).invoke(consumerBean); + if (result != null) { + final boolean isPublisher = Publishers.isConvertibleToPublisher(result); + final Flux publisher = isPublisher ? kafkaConsumerProcessor.convertPublisher(result) : Flux.just(result); + handleResultFlux(consumerRecords, consumerRecord, publisher, isPublisher || info.isBlocking); + } + } + + @SuppressWarnings("java:S1874") // ErrorStrategyValue.NONE is deprecated + private boolean resolveWithErrorStrategy(@Nullable ConsumerRecords consumerRecords, + ConsumerRecord consumerRecord, Throwable e) { + if (info.errorStrategy.isRetry()) { + final TopicPartition topicPartition = getTopicPartition(consumerRecord); + if (shouldRetryException(e) && info.retryCount > 0) { + // Check how many retries so far + final int currentRetryCount = getCurrentRetryCount(consumerRecord); + if (info.retryCount >= currentRetryCount) { + + // We will retry this batch again next time + if (info.shouldHandleAllExceptions) { + handleException(e, consumerRecords, consumerRecord); + } + // Move back to the previous position + kafkaConsumer.seek(topicPartition, consumerRecord.offset()); + // Decide how long should we wait to retry this batch again + delayRetry(currentRetryCount, Collections.singleton(topicPartition)); + return true; + } + } + // We will NOT retry this record anymore + topicPartitionRetries.remove(topicPartition); + } + // Skip the failing record + handleException(e, consumerRecords, consumerRecord); + return info.errorStrategy == ErrorStrategyValue.NONE; + } + + private void commitSync(ConsumerRecords consumerRecords, ConsumerRecord consumerRecord, Map currentOffsets) { + try { + kafkaConsumer.commitSync(currentOffsets); + } catch (CommitFailedException e) { + handleException(e, consumerRecords, consumerRecord); + } + } + + private void resolveCommitCallback(Map offsets, Exception exception) { + if (consumerBean instanceof OffsetCommitCallback occ) { + occ.onComplete(offsets, exception); + } else if (exception != null) { + LOG.error("Error asynchronously committing Kafka offsets [{}]: {}", offsets, + exception.getMessage(), exception); + } + } + + private void resetTheFollowingPartitions(ConsumerRecord errorConsumerRecord, Iterator> iterator) { + Set processedPartition = new HashSet<>(); + processedPartition.add(errorConsumerRecord.partition()); + while (iterator.hasNext()) { + ConsumerRecord consumerRecord = iterator.next(); + if (!processedPartition.add(consumerRecord.partition())) { + continue; + } + kafkaConsumer.seek(getTopicPartition(consumerRecord), consumerRecord.offset()); + } + } + + private int getCurrentRetryCount(ConsumerRecord consumerRecord) { + return getPartitionRetryState(getTopicPartition(consumerRecord), consumerRecord.offset()).currentRetryCount; + } + + private static TopicPartition getTopicPartition(ConsumerRecord consumerRecord) { + return new TopicPartition(consumerRecord.topic(), consumerRecord.partition()); + } + + private static ConsumerRecord makeConsumerRecord(RecordDeserializationException ex) { + final TopicPartition tp = ex.topicPartition(); + return new ConsumerRecord<>(tp.topic(), tp.partition(), ex.offset(), null, null); + } +} diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java index a45c0e782..f59e55b48 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java @@ -45,9 +45,6 @@ import io.micronaut.core.annotation.NonNull; import io.micronaut.core.annotation.Nullable; import io.micronaut.core.async.publisher.Publishers; -import io.micronaut.core.bind.BoundExecutable; -import io.micronaut.core.bind.DefaultExecutableBinder; -import io.micronaut.core.bind.ExecutableBinder; import io.micronaut.core.bind.annotation.Bindable; import io.micronaut.core.naming.NameUtils; import io.micronaut.core.type.Argument; @@ -350,14 +347,12 @@ Flux convertPublisher(T result) { return Flux.from((Publisher) Publishers.convertPublisher(beanContext.getConversionService(), result, Publisher.class)); } - BoundExecutable bind(ExecutableMethod method, Map, Object> boundArguments, ConsumerRecord consumerRecord) { - final ExecutableBinder> executableBinder = new DefaultExecutableBinder<>(boundArguments); - return executableBinder.bind(method, binderRegistry, consumerRecord); + ConsumerRecordBinderRegistry getBinderRegistry() { + return binderRegistry; } - BoundExecutable bindAsBatch(ExecutableMethod method, Map, Object> boundArguments, ConsumerRecords consumerRecords) { - final ExecutableBinder> batchBinder = new DefaultExecutableBinder<>(boundArguments); - return batchBinder.bind(method, batchBinderRegistry, consumerRecords); + BatchConsumerRecordsBinderRegistry getBatchBinderRegistry() { + return batchBinderRegistry; } @SuppressWarnings("rawtypes") @@ -466,7 +461,9 @@ private void submitConsumerThreads(final ExecutableMethod method, topicAnnotations.forEach(a -> setupConsumerSubscription(method, a, consumerBean, kafkaConsumer)); kafkaConsumerSubscribedEventPublisher.publishEvent(new KafkaConsumerSubscribedEvent(kafkaConsumer)); final ConsumerInfo consumerInfo = new ConsumerInfo(finalClientId, groupId, offsetStrategy, consumerAnnotation, method); - final ConsumerState consumerState = new ConsumerState(this, consumerInfo, kafkaConsumer, consumerBean); + final ConsumerState consumerState = consumerInfo.isBatch ? + new ConsumerStateBatch(this, consumerInfo, kafkaConsumer, consumerBean) : + new ConsumerStateSingle(this, consumerInfo, kafkaConsumer, consumerBean); consumers.put(finalClientId, consumerState); executorService.submit(consumerState::threadPollLoop); } diff --git a/kafka/src/test/groovy/io/micronaut/configuration/kafka/errors/KafkaBatchErrorStrategySpec.groovy b/kafka/src/test/groovy/io/micronaut/configuration/kafka/errors/KafkaBatchErrorStrategySpec.groovy index aaf7729bc..47d6bc0c2 100644 --- a/kafka/src/test/groovy/io/micronaut/configuration/kafka/errors/KafkaBatchErrorStrategySpec.groovy +++ b/kafka/src/test/groovy/io/micronaut/configuration/kafka/errors/KafkaBatchErrorStrategySpec.groovy @@ -79,9 +79,9 @@ class KafkaBatchErrorStrategySpec extends AbstractEmbeddedServerSpec { and: "The retry error strategy was honored" myConsumer.exceptions.size() == 2 myConsumer.exceptions[0].message.startsWith('Error deserializing key/value') - (myConsumer.exceptions[0].cause as RecordDeserializationException).offset == 3 + (myConsumer.exceptions[0].cause as RecordDeserializationException).offset() == 3 myConsumer.exceptions[1].message.startsWith('Error deserializing key/value') - (myConsumer.exceptions[1].cause as RecordDeserializationException).offset == 3 + (myConsumer.exceptions[1].cause as RecordDeserializationException).offset() == 3 } void "test batch mode with 'retry exp' error strategy"() { @@ -145,7 +145,7 @@ class KafkaBatchErrorStrategySpec extends AbstractEmbeddedServerSpec { List received = [] List exceptions = [] - String concatenate(List messages) { + static String concatenate(List messages) { return messages.stream().map(Object::toString).collect(Collectors.joining('/')) } } @@ -178,7 +178,7 @@ class KafkaBatchErrorStrategySpec extends AbstractEmbeddedServerSpec { List times = [] @Topic(BATCH_MODE_RETRY) - void handleBatch(List messages) { + void receiveBatch(List messages) { received << concatenate(messages) times << System.currentTimeMillis() if (count.getAndIncrement() == 0) { @@ -198,7 +198,7 @@ class KafkaBatchErrorStrategySpec extends AbstractEmbeddedServerSpec { static class RetryDeserConsumer extends AbstractConsumer implements KafkaListenerExceptionHandler { @Topic(BATCH_MODE_RETRY_DESER) - void handleBatch(List numbers) { + void receiveBatch(List numbers) { received << concatenate(numbers) } @@ -220,7 +220,7 @@ class KafkaBatchErrorStrategySpec extends AbstractEmbeddedServerSpec { List times = [] @Topic(BATCH_MODE_RETRY_EXP) - void handleBatch(List messages) { + void receiveBatch(List messages) { received << concatenate(messages) times << System.currentTimeMillis() if (count.getAndIncrement() < 4) { From 4b7d7bdf6ec62f34d6608df399030a515d9b0a50 Mon Sep 17 00:00:00 2001 From: Guillermo Calvo Date: Thu, 5 Oct 2023 10:31:13 +0200 Subject: [PATCH 3/4] Small optimization: zip result flux with (iterable) consumer records --- .../kafka/processor/ConsumerStateBatch.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/ConsumerStateBatch.java b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/ConsumerStateBatch.java index 0f5401740..fa611312d 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/ConsumerStateBatch.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/ConsumerStateBatch.java @@ -100,9 +100,7 @@ private void handleResult(Object result, ConsumerRecords consumerRecords) final boolean isPublisher = Publishers.isConvertibleToPublisher(result); final boolean isBlocking = info.isBlocking || !isPublisher; // Flux of tuples (consumer record / result) - final Flux, ?>> resultRecordFlux; - // Flux of consumer records - final Flux> recordFlux = Flux.fromIterable(consumerRecords); + final Flux>> resultRecordFlux; // Flux of results final Flux resultFlux; if (result instanceof Iterable iterable) { @@ -112,9 +110,9 @@ private void handleResult(Object result, ConsumerRecords consumerRecords) } else { resultFlux = Flux.just(result); } - // Zip consumer record flux with result flux - resultRecordFlux = recordFlux.zipWith(resultFlux) - .doOnNext(x -> handleResultFlux(consumerRecords, x.getT1(), Flux.just(x.getT2()), isBlocking)); + // Zip result flux with consumer records + resultRecordFlux = resultFlux.zipWithIterable(consumerRecords) + .doOnNext(x -> handleResultFlux(consumerRecords, x.getT2(), Flux.just(x.getT1()), isBlocking)); // Block on the zipped flux or subscribe if non-blocking if (isBlocking) { resultRecordFlux.blockLast(); From 766b1dacb8616c8b55374c0be85fbd0ef2bcf4c1 Mon Sep 17 00:00:00 2001 From: Guillermo Calvo Date: Thu, 5 Oct 2023 11:16:46 +0200 Subject: [PATCH 4/4] Refactor test --- .../kafka/errors/KafkaBatchErrorStrategySpec.groovy | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/kafka/src/test/groovy/io/micronaut/configuration/kafka/errors/KafkaBatchErrorStrategySpec.groovy b/kafka/src/test/groovy/io/micronaut/configuration/kafka/errors/KafkaBatchErrorStrategySpec.groovy index 47d6bc0c2..bfc05d2ff 100644 --- a/kafka/src/test/groovy/io/micronaut/configuration/kafka/errors/KafkaBatchErrorStrategySpec.groovy +++ b/kafka/src/test/groovy/io/micronaut/configuration/kafka/errors/KafkaBatchErrorStrategySpec.groovy @@ -37,8 +37,7 @@ class KafkaBatchErrorStrategySpec extends AbstractEmbeddedServerSpec { then: "The batch that threw the exception was skipped and the next batch was processed" conditions.eventually { - myConsumer.received == ['One/Two', 'Three/Four'] || - myConsumer.received == ['One', 'Two/Three', 'Four'] + concatenate(myConsumer.received) == 'One/Two/Three/Four' } } @@ -144,10 +143,6 @@ class KafkaBatchErrorStrategySpec extends AbstractEmbeddedServerSpec { AtomicInteger count = new AtomicInteger(0) List received = [] List exceptions = [] - - static String concatenate(List messages) { - return messages.stream().map(Object::toString).collect(Collectors.joining('/')) - } } @Requires(property = 'spec.name', value = 'KafkaBatchErrorStrategySpec') @@ -250,4 +245,8 @@ class KafkaBatchErrorStrategySpec extends AbstractEmbeddedServerSpec { exceptions << exception } } + + static String concatenate(List messages) { + return messages.stream().map(Object::toString).collect(Collectors.joining('/')) + } }