diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/ErrorStrategyValue.java b/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/ErrorStrategyValue.java index 12d72d811..3aa6ac4d0 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/ErrorStrategyValue.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/ErrorStrategyValue.java @@ -15,6 +15,8 @@ */ package io.micronaut.configuration.kafka.annotation; +import java.time.Duration; + /** * Defines the type of error handling strategy that micronaut-kafka will perform in case * of error. The default exception handler or any custom exception handler will be performed @@ -54,5 +56,30 @@ public enum ErrorStrategyValue { * See https://github.com/micronaut-projects/micronaut-kafka/issues/372 */ @Deprecated - NONE + NONE; + + /** + * + * @return Whether this is a retry error strategy. + */ + public boolean isRetry() { + return this == RETRY_ON_ERROR || this == RETRY_EXPONENTIALLY_ON_ERROR; + } + + /** + * Compute retry delay given a fixed delay and the number of attempts. + * + * @param fixedRetryDelay The fixed retry delay. + * @param retryAttempts The number of retries so far. + * @return The amount of time to wait before trying again. + */ + public Duration computeRetryDelay(Duration fixedRetryDelay, long retryAttempts) { + if (!isRetry()) { + return Duration.ZERO; + } + if (this == ErrorStrategyValue.RETRY_EXPONENTIALLY_ON_ERROR) { + return fixedRetryDelay.multipliedBy(1L << (retryAttempts - 1)); + } + return fixedRetryDelay; + } } diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/ConsumerState.java b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/ConsumerState.java index ad057a01f..02c941084 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/ConsumerState.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/ConsumerState.java @@ -100,7 +100,7 @@ final class ConsumerState { ? errorStrategyAnnotation.getRequiredValue(ErrorStrategyValue.class) : ErrorStrategyValue.NONE; - if (isRetryErrorStrategy(errorStrategy)) { + if (errorStrategy.isRetry()) { Duration retryDelay = errorStrategyAnnotation.get("retryDelay", Duration.class) .orElse(Duration.ofSeconds(ErrorStrategy.DEFAULT_DELAY_IN_SECONDS)); this.errorStrategyRetryDelay = retryDelay.isNegative() || retryDelay.isZero() ? null : retryDelay; @@ -209,8 +209,4 @@ synchronized void pauseTopicPartitions() { } _pausedTopicPartitions.addAll(validPauseRequests); } - - private static boolean isRetryErrorStrategy(ErrorStrategyValue currentErrorStrategy) { - return currentErrorStrategy == ErrorStrategyValue.RETRY_ON_ERROR || currentErrorStrategy == ErrorStrategyValue.RETRY_EXPONENTIALLY_ON_ERROR; - } } diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java index a2400b34b..4c6956489 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java @@ -646,7 +646,7 @@ private boolean resolveWithErrorStrategy(ConsumerState consumerState, ErrorStrategyValue currentErrorStrategy = consumerState.errorStrategy; - if (isRetryErrorStrategy(currentErrorStrategy) && consumerState.errorStrategyExceptions.length > 0 && Arrays.stream(consumerState.errorStrategyExceptions).noneMatch(error -> error.equals(e.getClass()))) { + if (currentErrorStrategy.isRetry() && consumerState.errorStrategyExceptions.length > 0 && Arrays.stream(consumerState.errorStrategyExceptions).noneMatch(error -> error.equals(e.getClass()))) { if (consumerState.partitionRetries != null) { consumerState.partitionRetries.remove(consumerRecord.partition()); } @@ -654,7 +654,7 @@ private boolean resolveWithErrorStrategy(ConsumerState consumerState, currentErrorStrategy = ErrorStrategyValue.RESUME_AT_NEXT_RECORD; } - if (isRetryErrorStrategy(currentErrorStrategy) && consumerState.errorStrategyRetryCount != 0) { + if (currentErrorStrategy.isRetry() && consumerState.errorStrategyRetryCount != 0) { if (consumerState.partitionRetries == null) { consumerState.partitionRetries = new HashMap<>(); } @@ -675,7 +675,7 @@ private boolean resolveWithErrorStrategy(ConsumerState consumerState, TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), partition); consumerState.kafkaConsumer.seek(topicPartition, consumerRecord.offset()); - Duration retryDelay = computeRetryDelay(currentErrorStrategy, consumerState.errorStrategyRetryDelay, retryState.currentRetryCount); + Duration retryDelay = currentErrorStrategy.computeRetryDelay(consumerState.errorStrategyRetryDelay, retryState.currentRetryCount); if (retryDelay != null) { // in the stop on error strategy, pause the consumer and resume after the retryDelay duration Set paused = Collections.singleton(topicPartition); @@ -695,20 +695,6 @@ private boolean resolveWithErrorStrategy(ConsumerState consumerState, return currentErrorStrategy != ErrorStrategyValue.RESUME_AT_NEXT_RECORD; } - private static boolean isRetryErrorStrategy(ErrorStrategyValue currentErrorStrategy) { - return currentErrorStrategy == ErrorStrategyValue.RETRY_ON_ERROR || currentErrorStrategy == ErrorStrategyValue.RETRY_EXPONENTIALLY_ON_ERROR; - } - - private Duration computeRetryDelay(ErrorStrategyValue errorStrategy, Duration fixedRetryDelay, long retryAttempts) { - if (errorStrategy == ErrorStrategyValue.RETRY_ON_ERROR) { - return fixedRetryDelay; - } else if (errorStrategy == ErrorStrategyValue.RETRY_EXPONENTIALLY_ON_ERROR) { - return fixedRetryDelay.multipliedBy(1L << (retryAttempts - 1)); - } else { - return Duration.ZERO; - } - } - private boolean processConsumerRecordsAsBatch(final ConsumerState consumerState, final ExecutableMethod method, final Map, Object> boundArguments,