Skip to content

Commit

Permalink
Refactor ErrorStrategyValue
Browse files Browse the repository at this point in the history
  • Loading branch information
guillermocalvo committed Sep 18, 2023
1 parent 0f9dfde commit cd628d0
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package io.micronaut.configuration.kafka.annotation;

import java.time.Duration;

/**
* Defines the type of error handling strategy that micronaut-kafka will perform in case
* of error. The default exception handler or any custom exception handler will be performed
Expand Down Expand Up @@ -54,5 +56,30 @@ public enum ErrorStrategyValue {
* See https://github.com/micronaut-projects/micronaut-kafka/issues/372
*/
@Deprecated
NONE
NONE;

/**
*
* @return Whether this is a retry error strategy.
*/
public boolean isRetry() {
return this == RETRY_ON_ERROR || this == RETRY_EXPONENTIALLY_ON_ERROR;
}

/**
* Compute retry delay given a fixed delay and the number of attempts.
*
* @param fixedRetryDelay The fixed retry delay.
* @param retryAttempts The number of retries so far.
* @return The amount of time to wait before trying again.
*/
public Duration computeRetryDelay(Duration fixedRetryDelay, long retryAttempts) {
if (!isRetry()) {
return Duration.ZERO;
}
if (this == ErrorStrategyValue.RETRY_EXPONENTIALLY_ON_ERROR) {
return fixedRetryDelay.multipliedBy(1L << (retryAttempts - 1));
}
return fixedRetryDelay;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ final class ConsumerState {
? errorStrategyAnnotation.getRequiredValue(ErrorStrategyValue.class)
: ErrorStrategyValue.NONE;

if (isRetryErrorStrategy(errorStrategy)) {
if (errorStrategy.isRetry()) {
Duration retryDelay = errorStrategyAnnotation.get("retryDelay", Duration.class)
.orElse(Duration.ofSeconds(ErrorStrategy.DEFAULT_DELAY_IN_SECONDS));
this.errorStrategyRetryDelay = retryDelay.isNegative() || retryDelay.isZero() ? null : retryDelay;
Expand Down Expand Up @@ -209,8 +209,4 @@ synchronized void pauseTopicPartitions() {
}
_pausedTopicPartitions.addAll(validPauseRequests);
}

private static boolean isRetryErrorStrategy(ErrorStrategyValue currentErrorStrategy) {
return currentErrorStrategy == ErrorStrategyValue.RETRY_ON_ERROR || currentErrorStrategy == ErrorStrategyValue.RETRY_EXPONENTIALLY_ON_ERROR;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -646,15 +646,15 @@ private boolean resolveWithErrorStrategy(ConsumerState consumerState,

ErrorStrategyValue currentErrorStrategy = consumerState.errorStrategy;

if (isRetryErrorStrategy(currentErrorStrategy) && consumerState.errorStrategyExceptions.length > 0 && Arrays.stream(consumerState.errorStrategyExceptions).noneMatch(error -> error.equals(e.getClass()))) {
if (currentErrorStrategy.isRetry() && consumerState.errorStrategyExceptions.length > 0 && Arrays.stream(consumerState.errorStrategyExceptions).noneMatch(error -> error.equals(e.getClass()))) {
if (consumerState.partitionRetries != null) {
consumerState.partitionRetries.remove(consumerRecord.partition());
}
// Skip the failing record
currentErrorStrategy = ErrorStrategyValue.RESUME_AT_NEXT_RECORD;
}

if (isRetryErrorStrategy(currentErrorStrategy) && consumerState.errorStrategyRetryCount != 0) {
if (currentErrorStrategy.isRetry() && consumerState.errorStrategyRetryCount != 0) {
if (consumerState.partitionRetries == null) {
consumerState.partitionRetries = new HashMap<>();
}
Expand All @@ -675,7 +675,7 @@ private boolean resolveWithErrorStrategy(ConsumerState consumerState,
TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), partition);
consumerState.kafkaConsumer.seek(topicPartition, consumerRecord.offset());

Duration retryDelay = computeRetryDelay(currentErrorStrategy, consumerState.errorStrategyRetryDelay, retryState.currentRetryCount);
Duration retryDelay = currentErrorStrategy.computeRetryDelay(consumerState.errorStrategyRetryDelay, retryState.currentRetryCount);
if (retryDelay != null) {
// in the stop on error strategy, pause the consumer and resume after the retryDelay duration
Set<TopicPartition> paused = Collections.singleton(topicPartition);
Expand All @@ -695,20 +695,6 @@ private boolean resolveWithErrorStrategy(ConsumerState consumerState,
return currentErrorStrategy != ErrorStrategyValue.RESUME_AT_NEXT_RECORD;
}

private static boolean isRetryErrorStrategy(ErrorStrategyValue currentErrorStrategy) {
return currentErrorStrategy == ErrorStrategyValue.RETRY_ON_ERROR || currentErrorStrategy == ErrorStrategyValue.RETRY_EXPONENTIALLY_ON_ERROR;
}

private Duration computeRetryDelay(ErrorStrategyValue errorStrategy, Duration fixedRetryDelay, long retryAttempts) {
if (errorStrategy == ErrorStrategyValue.RETRY_ON_ERROR) {
return fixedRetryDelay;
} else if (errorStrategy == ErrorStrategyValue.RETRY_EXPONENTIALLY_ON_ERROR) {
return fixedRetryDelay.multipliedBy(1L << (retryAttempts - 1));
} else {
return Duration.ZERO;
}
}

private boolean processConsumerRecordsAsBatch(final ConsumerState consumerState,
final ExecutableMethod<?, ?> method,
final Map<Argument<?>, Object> boundArguments,
Expand Down

0 comments on commit cd628d0

Please sign in to comment.