Skip to content

Commit

Permalink
feat(micronaut-projectsGH-604): introduce tests for exponential backo…
Browse files Browse the repository at this point in the history
…ff retry strategy
  • Loading branch information
breader124 committed Dec 26, 2022
1 parent 81d7e08 commit 48b7e9a
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,16 @@
int DEFAULT_RETRY_COUNT = 1;

/**
* The delay used with RETRY_ON_ERROR {@link io.micronaut.configuration.kafka.annotation.ErrorStrategyValue}.
* The delay used with RETRY_ON_ERROR and RETRY_EXPONENTIALLY_ON_ERROR
* {@link io.micronaut.configuration.kafka.annotation.ErrorStrategyValue}.
*
* @return the delay by which to wait for the next retry
*/
String retryDelay() default DEFAULT_DELAY_IN_SECONDS + "s";

/**
* The retry count used with RETRY_ON_ERROR {@link io.micronaut.configuration.kafka.annotation.ErrorStrategyValue}.
* The retry count used with RETRY_ON_ERROR and RETRY_EXPONENTIALLY_ON_ERROR
* {@link io.micronaut.configuration.kafka.annotation.ErrorStrategyValue}.
*
* @return the retry count of how many attempts should be made
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ public enum ErrorStrategyValue {
*/
RETRY_ON_ERROR,

/**
* This strategy will stop consuming subsequent records in the case of an error and will
* attempt to re-consume the current record indefinitely with exponentially growing breaks
* between consumption attempts. Breaks' duration is computed based on the n * 2^k formula,
* where n is the initial delay, and k is the number of retries.
*/
RETRY_EXPONENTIALLY_ON_ERROR,

/**
* This strategy will ignore the current error and will resume at the next offset.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import io.micronaut.configuration.kafka.AbstractEmbeddedServerSpec
import io.micronaut.configuration.kafka.annotation.ErrorStrategy
import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.OffsetStrategy
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.configuration.kafka.exceptions.KafkaListenerException
import io.micronaut.configuration.kafka.exceptions.KafkaListenerExceptionHandler
Expand All @@ -19,6 +18,7 @@ import java.util.concurrent.atomic.AtomicInteger

import static io.micronaut.configuration.kafka.annotation.ErrorStrategyValue.NONE
import static io.micronaut.configuration.kafka.annotation.ErrorStrategyValue.RESUME_AT_NEXT_RECORD
import static io.micronaut.configuration.kafka.annotation.ErrorStrategyValue.RETRY_EXPONENTIALLY_ON_ERROR
import static io.micronaut.configuration.kafka.annotation.ErrorStrategyValue.RETRY_ON_ERROR
import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST
import static io.micronaut.configuration.kafka.annotation.OffsetStrategy.SYNC
Expand Down Expand Up @@ -59,6 +59,25 @@ class KafkaErrorStrategySpec extends AbstractEmbeddedServerSpec {
myConsumer.times[1] - myConsumer.times[0] >= 50
}

void "test when error strategy is 'retry exponentially on error' then message is retried with exponential backoff"() {
when: "A consumer throws an exception"
RetryErrorClient myClient = context.getBean(RetryErrorClient)
myClient.sendMessage("One")
myClient.sendMessage("Two")

RetryExpOnErrorErrorCausingConsumer myConsumer = context.getBean(RetryExpOnErrorErrorCausingConsumer)

then: "All messages are consumed eventually"
conditions.eventually {
myConsumer.received == ["One", "One", "One", "One", "Two"]
myConsumer.count.get() == 5
}
and: "First message was retried with exponential breaks between deliveries"
myConsumer.times[1] - myConsumer.times[0] >= 50
myConsumer.times[2] - myConsumer.times[1] >= 100
myConsumer.times[3] - myConsumer.times[2] >= 200
}

void "test simultaneous retry and consumer reassignment"() {
when: "A consumer throws an exception"
TimeoutAndRetryErrorClient myClient = context.getBean(TimeoutAndRetryErrorClient)
Expand Down Expand Up @@ -135,6 +154,27 @@ class KafkaErrorStrategySpec extends AbstractEmbeddedServerSpec {
}
}

@Requires(property = 'spec.name', value = 'KafkaErrorStrategySpec')
@KafkaListener(
offsetReset = EARLIEST,
offsetStrategy = SYNC,
errorStrategy = @ErrorStrategy(value = RETRY_EXPONENTIALLY_ON_ERROR, retryDelay = "50ms")
)
static class RetryExpOnErrorErrorCausingConsumer {
AtomicInteger count = new AtomicInteger(0)
List<String> received = []
List<Long> times = []

@Topic("errors-retry")
void handleMessage(String message) {
received << message
times << System.currentTimeMillis()
if (count.getAndIncrement() < 3) {
throw new RuntimeException("Won't handle first three delivery attempts")
}
}
}

@Requires(property = 'spec.name', value = 'KafkaErrorStrategySpec')
@KafkaListener(offsetReset = EARLIEST, offsetStrategy = SYNC, errorStrategy = @ErrorStrategy(value = NONE))
static class PollNextErrorCausingConsumer implements KafkaListenerExceptionHandler {
Expand Down

0 comments on commit 48b7e9a

Please sign in to comment.