Skip to content

Commit

Permalink
feat(micronaut-projectsGH-604): implement exponential retry error str…
Browse files Browse the repository at this point in the history
…ategy
  • Loading branch information
breader124 committed Dec 29, 2022
1 parent 48b7e9a commit f99a004
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -566,15 +566,15 @@ private boolean resolveWithErrorStrategy(ConsumerState consumerState,

ErrorStrategyValue currentErrorStrategy = consumerState.errorStrategy;

if (currentErrorStrategy == ErrorStrategyValue.RETRY_ON_ERROR && consumerState.errorStrategyExceptions.length > 0 && Arrays.stream(consumerState.errorStrategyExceptions).noneMatch(error -> error.equals(e.getClass()))) {
if (isRetryErrorStrategy(currentErrorStrategy) && consumerState.errorStrategyExceptions.length > 0 && Arrays.stream(consumerState.errorStrategyExceptions).noneMatch(error -> error.equals(e.getClass()))) {
if (consumerState.partitionRetries != null) {
consumerState.partitionRetries.remove(consumerRecord.partition());
}
// Skip the failing record
currentErrorStrategy = ErrorStrategyValue.RESUME_AT_NEXT_RECORD;
}

if (currentErrorStrategy == ErrorStrategyValue.RETRY_ON_ERROR && consumerState.errorStrategyRetryCount != 0) {
if (isRetryErrorStrategy(currentErrorStrategy) && consumerState.errorStrategyRetryCount != 0) {
if (consumerState.partitionRetries == null) {
consumerState.partitionRetries = new HashMap<>();
}
Expand All @@ -591,7 +591,7 @@ private boolean resolveWithErrorStrategy(ConsumerState consumerState,
TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), partition);
consumerState.kafkaConsumer.seek(topicPartition, consumerRecord.offset());

Duration retryDelay = consumerState.errorStrategyRetryDelay;
Duration retryDelay = computeRetryDelay(currentErrorStrategy, consumerState.errorStrategyRetryDelay, retryState.currentRetryCount);
if (retryDelay != null) {
// in the stop on error strategy, pause the consumer and resume after the retryDelay duration
Set<TopicPartition> paused = Collections.singleton(topicPartition);
Expand All @@ -612,6 +612,20 @@ private boolean resolveWithErrorStrategy(ConsumerState consumerState,
return currentErrorStrategy != ErrorStrategyValue.RESUME_AT_NEXT_RECORD;
}

private static boolean isRetryErrorStrategy(ErrorStrategyValue currentErrorStrategy) {
return currentErrorStrategy == ErrorStrategyValue.RETRY_ON_ERROR || currentErrorStrategy == ErrorStrategyValue.RETRY_EXPONENTIALLY_ON_ERROR;
}

private Duration computeRetryDelay(ErrorStrategyValue errorStrategy, Duration fixedRetryDelay, long retryAttempts) {
if (errorStrategy == ErrorStrategyValue.RETRY_ON_ERROR) {
return fixedRetryDelay;
} else if (errorStrategy == ErrorStrategyValue.RETRY_EXPONENTIALLY_ON_ERROR) {
return fixedRetryDelay.multipliedBy(1L << (retryAttempts - 1));
} else {
return Duration.ZERO;
}
}

private boolean processConsumerRecordsAsBatch(final ConsumerState consumerState,
final ExecutableMethod<?, ?> method,
final Map<Argument<?>, Object> boundArguments,
Expand Down Expand Up @@ -1043,7 +1057,7 @@ private ConsumerState(String clientId, String groupId, OffsetStrategy offsetStra
.map(ErrorStrategy::value)
.orElse(ErrorStrategyValue.NONE);

if (errorStrategy == ErrorStrategyValue.RETRY_ON_ERROR) {
if (isRetryErrorStrategy(errorStrategy)) {
Duration retryDelay = errorStrategyAnnotation.get("retryDelay", Duration.class)
.orElse(Duration.ofSeconds(ErrorStrategy.DEFAULT_DELAY_IN_SECONDS));
this.errorStrategyRetryDelay = retryDelay.isNegative() || retryDelay.isZero() ? null : retryDelay;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,15 @@ class KafkaErrorStrategySpec extends AbstractEmbeddedServerSpec {

void "test when error strategy is 'retry exponentially on error' then message is retried with exponential backoff"() {
when: "A consumer throws an exception"
RetryErrorClient myClient = context.getBean(RetryErrorClient)
ExpRetryErrorClient myClient = context.getBean(ExpRetryErrorClient)
myClient.sendMessage("One")
myClient.sendMessage("Two")

RetryExpOnErrorErrorCausingConsumer myConsumer = context.getBean(RetryExpOnErrorErrorCausingConsumer)

then: "All messages are consumed eventually"
conditions.eventually {
myConsumer.received == ["One", "One", "One", "One", "Two"]
myConsumer.count.get() == 5
myConsumer.received == ["One", "One", "One", "One"]
myConsumer.count.get() == 4
}
and: "First message was retried with exponential breaks between deliveries"
myConsumer.times[1] - myConsumer.times[0] >= 50
Expand Down Expand Up @@ -158,18 +157,18 @@ class KafkaErrorStrategySpec extends AbstractEmbeddedServerSpec {
@KafkaListener(
offsetReset = EARLIEST,
offsetStrategy = SYNC,
errorStrategy = @ErrorStrategy(value = RETRY_EXPONENTIALLY_ON_ERROR, retryDelay = "50ms")
errorStrategy = @ErrorStrategy(value = RETRY_EXPONENTIALLY_ON_ERROR, retryCount = 3, retryDelay = "50ms")
)
static class RetryExpOnErrorErrorCausingConsumer {
AtomicInteger count = new AtomicInteger(0)
List<String> received = []
List<Long> times = []

@Topic("errors-retry")
@Topic("errors-exp-retry")
void handleMessage(String message) {
received << message
times << System.currentTimeMillis()
if (count.getAndIncrement() < 3) {
if (count.getAndIncrement() < 4) {
throw new RuntimeException("Won't handle first three delivery attempts")
}
}
Expand Down Expand Up @@ -250,6 +249,13 @@ class KafkaErrorStrategySpec extends AbstractEmbeddedServerSpec {
void sendMessage(String message)
}

@Requires(property = 'spec.name', value = 'KafkaErrorStrategySpec')
@KafkaClient
static interface ExpRetryErrorClient {
@Topic("errors-exp-retry")
void sendMessage(String message)
}

@Requires(property = 'spec.name', value = 'KafkaErrorStrategySpec')
@KafkaClient
static interface PollNextErrorClient {
Expand Down

0 comments on commit f99a004

Please sign in to comment.