Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-604: Introduce exponential backoff for ErrorStrategy retry delay #644

Merged
merged 1 commit into from
Jan 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,16 @@
int DEFAULT_RETRY_COUNT = 1;

/**
* The delay used with RETRY_ON_ERROR {@link io.micronaut.configuration.kafka.annotation.ErrorStrategyValue}.
* The delay used with RETRY_ON_ERROR and RETRY_EXPONENTIALLY_ON_ERROR
* {@link io.micronaut.configuration.kafka.annotation.ErrorStrategyValue}.
*
* @return the delay by which to wait for the next retry
*/
String retryDelay() default DEFAULT_DELAY_IN_SECONDS + "s";

/**
* The retry count used with RETRY_ON_ERROR {@link io.micronaut.configuration.kafka.annotation.ErrorStrategyValue}.
* The retry count used with RETRY_ON_ERROR and RETRY_EXPONENTIALLY_ON_ERROR
* {@link io.micronaut.configuration.kafka.annotation.ErrorStrategyValue}.
*
* @return the retry count of how many attempts should be made
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ public enum ErrorStrategyValue {
*/
RETRY_ON_ERROR,

/**
* This strategy will stop consuming subsequent records in the case of an error and will
* attempt to re-consume the current record with exponentially growing time breaks between
* consumption attempts. Breaks' duration is computed based on the n * 2^(k - 1) formula,
* where n is the initial delay, and k is the number of retries.
*/
RETRY_EXPONENTIALLY_ON_ERROR,

/**
* This strategy will ignore the current error and will resume at the next offset.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,15 +566,15 @@ private boolean resolveWithErrorStrategy(ConsumerState consumerState,

ErrorStrategyValue currentErrorStrategy = consumerState.errorStrategy;

if (currentErrorStrategy == ErrorStrategyValue.RETRY_ON_ERROR && consumerState.errorStrategyExceptions.length > 0 && Arrays.stream(consumerState.errorStrategyExceptions).noneMatch(error -> error.equals(e.getClass()))) {
if (isRetryErrorStrategy(currentErrorStrategy) && consumerState.errorStrategyExceptions.length > 0 && Arrays.stream(consumerState.errorStrategyExceptions).noneMatch(error -> error.equals(e.getClass()))) {
if (consumerState.partitionRetries != null) {
consumerState.partitionRetries.remove(consumerRecord.partition());
}
// Skip the failing record
currentErrorStrategy = ErrorStrategyValue.RESUME_AT_NEXT_RECORD;
}

if (currentErrorStrategy == ErrorStrategyValue.RETRY_ON_ERROR && consumerState.errorStrategyRetryCount != 0) {
if (isRetryErrorStrategy(currentErrorStrategy) && consumerState.errorStrategyRetryCount != 0) {
if (consumerState.partitionRetries == null) {
consumerState.partitionRetries = new HashMap<>();
}
Expand All @@ -591,7 +591,7 @@ private boolean resolveWithErrorStrategy(ConsumerState consumerState,
TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), partition);
consumerState.kafkaConsumer.seek(topicPartition, consumerRecord.offset());

Duration retryDelay = consumerState.errorStrategyRetryDelay;
Duration retryDelay = computeRetryDelay(currentErrorStrategy, consumerState.errorStrategyRetryDelay, retryState.currentRetryCount);
if (retryDelay != null) {
// in the stop on error strategy, pause the consumer and resume after the retryDelay duration
Set<TopicPartition> paused = Collections.singleton(topicPartition);
Expand All @@ -612,6 +612,20 @@ private boolean resolveWithErrorStrategy(ConsumerState consumerState,
return currentErrorStrategy != ErrorStrategyValue.RESUME_AT_NEXT_RECORD;
}

private static boolean isRetryErrorStrategy(ErrorStrategyValue currentErrorStrategy) {
return currentErrorStrategy == ErrorStrategyValue.RETRY_ON_ERROR || currentErrorStrategy == ErrorStrategyValue.RETRY_EXPONENTIALLY_ON_ERROR;
}

private Duration computeRetryDelay(ErrorStrategyValue errorStrategy, Duration fixedRetryDelay, long retryAttempts) {
if (errorStrategy == ErrorStrategyValue.RETRY_ON_ERROR) {
return fixedRetryDelay;
} else if (errorStrategy == ErrorStrategyValue.RETRY_EXPONENTIALLY_ON_ERROR) {
return fixedRetryDelay.multipliedBy(1L << (retryAttempts - 1));
} else {
return Duration.ZERO;
}
}

private boolean processConsumerRecordsAsBatch(final ConsumerState consumerState,
final ExecutableMethod<?, ?> method,
final Map<Argument<?>, Object> boundArguments,
Expand Down Expand Up @@ -1043,7 +1057,7 @@ private ConsumerState(String clientId, String groupId, OffsetStrategy offsetStra
.map(ErrorStrategy::value)
.orElse(ErrorStrategyValue.NONE);

if (errorStrategy == ErrorStrategyValue.RETRY_ON_ERROR) {
if (isRetryErrorStrategy(errorStrategy)) {
Duration retryDelay = errorStrategyAnnotation.get("retryDelay", Duration.class)
.orElse(Duration.ofSeconds(ErrorStrategy.DEFAULT_DELAY_IN_SECONDS));
this.errorStrategyRetryDelay = retryDelay.isNegative() || retryDelay.isZero() ? null : retryDelay;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import io.micronaut.configuration.kafka.AbstractEmbeddedServerSpec
import io.micronaut.configuration.kafka.annotation.ErrorStrategy
import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.OffsetStrategy
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.configuration.kafka.exceptions.KafkaListenerException
import io.micronaut.configuration.kafka.exceptions.KafkaListenerExceptionHandler
Expand All @@ -19,6 +18,7 @@ import java.util.concurrent.atomic.AtomicInteger

import static io.micronaut.configuration.kafka.annotation.ErrorStrategyValue.NONE
import static io.micronaut.configuration.kafka.annotation.ErrorStrategyValue.RESUME_AT_NEXT_RECORD
import static io.micronaut.configuration.kafka.annotation.ErrorStrategyValue.RETRY_EXPONENTIALLY_ON_ERROR
import static io.micronaut.configuration.kafka.annotation.ErrorStrategyValue.RETRY_ON_ERROR
import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST
import static io.micronaut.configuration.kafka.annotation.OffsetStrategy.SYNC
Expand Down Expand Up @@ -59,6 +59,24 @@ class KafkaErrorStrategySpec extends AbstractEmbeddedServerSpec {
myConsumer.times[1] - myConsumer.times[0] >= 50
}

void "test when error strategy is 'retry exponentially on error' then message is retried with exponential backoff"() {
when: "A consumer throws an exception"
ExpRetryErrorClient myClient = context.getBean(ExpRetryErrorClient)
myClient.sendMessage("One")

RetryExpOnErrorErrorCausingConsumer myConsumer = context.getBean(RetryExpOnErrorErrorCausingConsumer)

then: "Message is consumed eventually"
conditions.eventually {
myConsumer.received == ["One", "One", "One", "One"]
myConsumer.count.get() == 4
}
and: "message was retried with exponential breaks between deliveries"
myConsumer.times[1] - myConsumer.times[0] >= 50
myConsumer.times[2] - myConsumer.times[1] >= 100
myConsumer.times[3] - myConsumer.times[2] >= 200
}

void "test simultaneous retry and consumer reassignment"() {
when: "A consumer throws an exception"
TimeoutAndRetryErrorClient myClient = context.getBean(TimeoutAndRetryErrorClient)
Expand Down Expand Up @@ -135,6 +153,27 @@ class KafkaErrorStrategySpec extends AbstractEmbeddedServerSpec {
}
}

@Requires(property = 'spec.name', value = 'KafkaErrorStrategySpec')
@KafkaListener(
offsetReset = EARLIEST,
offsetStrategy = SYNC,
errorStrategy = @ErrorStrategy(value = RETRY_EXPONENTIALLY_ON_ERROR, retryCount = 3, retryDelay = "50ms")
)
static class RetryExpOnErrorErrorCausingConsumer {
AtomicInteger count = new AtomicInteger(0)
List<String> received = []
List<Long> times = []

@Topic("errors-exp-retry")
void handleMessage(String message) {
received << message
times << System.currentTimeMillis()
if (count.getAndIncrement() < 4) {
throw new RuntimeException("Won't handle first three delivery attempts")
}
}
}

@Requires(property = 'spec.name', value = 'KafkaErrorStrategySpec')
@KafkaListener(offsetReset = EARLIEST, offsetStrategy = SYNC, errorStrategy = @ErrorStrategy(value = NONE))
static class PollNextErrorCausingConsumer implements KafkaListenerExceptionHandler {
Expand Down Expand Up @@ -210,6 +249,13 @@ class KafkaErrorStrategySpec extends AbstractEmbeddedServerSpec {
void sendMessage(String message)
}

@Requires(property = 'spec.name', value = 'KafkaErrorStrategySpec')
@KafkaClient
static interface ExpRetryErrorClient {
@Topic("errors-exp-retry")
void sendMessage(String message)
}

@Requires(property = 'spec.name', value = 'KafkaErrorStrategySpec')
@KafkaClient
static interface PollNextErrorClient {
Expand Down