Skip to content

Commit

Permalink
Introduce ErrorStrategy.retryCountValue to allow for dynamic config (
Browse files Browse the repository at this point in the history
  • Loading branch information
guillermocalvo authored Aug 4, 2023
1 parent fff6715 commit fd6d9fd
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package io.micronaut.configuration.kafka.annotation;

import io.micronaut.context.annotation.AliasFor;

import java.lang.annotation.Documented;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
Expand Down Expand Up @@ -61,13 +63,28 @@
String retryDelay() default DEFAULT_DELAY_IN_SECONDS + "s";

/**
* The retry count used with RETRY_ON_ERROR and RETRY_EXPONENTIALLY_ON_ERROR
* The fixed retry count used with RETRY_ON_ERROR and RETRY_EXPONENTIALLY_ON_ERROR
* {@link io.micronaut.configuration.kafka.annotation.ErrorStrategyValue}.
*
* <p>{@code retryCount} takes precedence over {@code retryCountValue} if they are both set.
*
* @return the retry count of how many attempts should be made
* @see ErrorStrategy#retryCountValue()
*/
int retryCount() default DEFAULT_RETRY_COUNT;

/**
* The dynamic retry count used with RETRY_ON_ERROR and RETRY_EXPONENTIALLY_ON_ERROR
* {@link io.micronaut.configuration.kafka.annotation.ErrorStrategyValue}.
*
* <p>{@code retryCountValue} will be overridden by {@code retryCount} if they are both set.
*
* @return the retry count of how many attempts should be made
* @see ErrorStrategy#retryCount()
*/
@AliasFor(member = "retryCount")
String retryCountValue() default "";

/**
* Whether all exceptions should be handled or ignored when using RETRY_ON_ERROR and RETRY_EXPONENTIALLY_ON_ERROR
* {@link io.micronaut.configuration.kafka.annotation.ErrorStrategyValue}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.TopicPartition
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import spock.lang.Unroll

import java.util.concurrent.atomic.AtomicInteger

Expand All @@ -30,7 +31,8 @@ class KafkaErrorStrategySpec extends AbstractEmbeddedServerSpec {

Map<String, Object> getConfiguration() {
super.configuration +
["kafka.consumers.errors-retry-multiple-partitions.allow.auto.create.topics" : false]
["kafka.consumers.errors-retry-multiple-partitions.allow.auto.create.topics" : false,
"my.retry.count": "3"]
}

@Override
Expand Down Expand Up @@ -145,6 +147,29 @@ class KafkaErrorStrategySpec extends AbstractEmbeddedServerSpec {
myConsumer.errors[3].message == "Three #6"
}

@Unroll
void "test when error strategy is 'retry on error' with #type retry count"(String type) {
when: "A consumer throws an exception"
RetryCountClient myClient = context.getBean(RetryCountClient)
myClient.sendMessage("${type}-retry-count", "ERROR")
myClient.sendMessage("${type}-retry-count", "OK")

AbstractRetryCountConsumer myConsumer = context.getBean(consumerClass)

then: "Messages are consumed eventually"
conditions.eventually {
myConsumer.received == "OK"
}
and: "messages were retried the correct number of times"
myConsumer.errors.size() == expectedErrorCount

where:
type | consumerClass | expectedErrorCount
'fixed' | FixedRetryCountConsumer | 11
'dynamic' | DynamicRetryCountConsumer | 4
'mixed' | MixedRetryCountConsumer | 11
}

void "test simultaneous retry and consumer reassignment"() {
when: "A consumer throws an exception"
TimeoutAndRetryErrorClient myClient = context.getBean(TimeoutAndRetryErrorClient)
Expand Down Expand Up @@ -371,6 +396,57 @@ class KafkaErrorStrategySpec extends AbstractEmbeddedServerSpec {
}
}

static abstract class AbstractRetryCountConsumer implements KafkaListenerExceptionHandler {
List<KafkaListenerException> errors = []
String received

void receive(String message) {
if (message == 'ERROR') throw new RuntimeException("Won't handle this one")
received = message
}

@Override
void handle(KafkaListenerException exception) {
errors << exception
}
}

@Requires(property = 'spec.name', value = 'KafkaErrorStrategySpec')
@KafkaListener(
offsetReset = EARLIEST,
errorStrategy = @ErrorStrategy(value = RETRY_ON_ERROR, retryCount = 10, handleAllExceptions = true)
)
static class FixedRetryCountConsumer extends AbstractRetryCountConsumer {
@Topic("fixed-retry-count")
void receiveMessage(String message) {
receive(message)
}
}

@Requires(property = 'spec.name', value = 'KafkaErrorStrategySpec')
@KafkaListener(
offsetReset = EARLIEST,
errorStrategy = @ErrorStrategy(value = RETRY_ON_ERROR, retryCountValue = '${my.retry.count}', handleAllExceptions = true)
)
static class DynamicRetryCountConsumer extends AbstractRetryCountConsumer {
@Topic("dynamic-retry-count")
void receiveMessage(String message) {
receive(message)
}
}

@Requires(property = 'spec.name', value = 'KafkaErrorStrategySpec')
@KafkaListener(
offsetReset = EARLIEST,
errorStrategy = @ErrorStrategy(value = RETRY_ON_ERROR, retryCount = 10, retryCountValue = '${my.retry.count}', handleAllExceptions = true)
)
static class MixedRetryCountConsumer extends AbstractRetryCountConsumer {
@Topic("mixed-retry-count")
void receiveMessage(String message) {
receive(message)
}
}

@Requires(property = 'spec.name', value = 'KafkaErrorStrategySpec')
@KafkaClient
static interface ResumeErrorClient {
Expand Down Expand Up @@ -429,4 +505,10 @@ class KafkaErrorStrategySpec extends AbstractEmbeddedServerSpec {
@Topic("errors-timeout-and-retry")
void sendMessage(String message)
}

@Requires(property = 'spec.name', value = 'KafkaErrorStrategySpec')
@KafkaClient
static interface RetryCountClient {
void sendMessage(@Topic topic, String message)
}
}
10 changes: 10 additions & 0 deletions src/main/docs/guide/kafkaListener/kafkaErrors.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@ You can choose one of the error strategies:
NOTE: The error strategies apply only for non-batch messages processing.

You can also make the number of retries configurable by using `retryCountValue`:

.Dynamically Configuring Retries
[source,java]
----
@KafkaListener(value = "myGroup", errorStrategy = @ErrorStrategy(value = RETRY_ON_ERROR, retryCountValue = "${my.retry.count"}))
----

NOTE: `retryCountValue` will be overridden by `retryCount` if they are both set.

==== Specify exceptions to retry

It's possible to define only exceptions from which the retry will occur.
Expand Down

0 comments on commit fd6d9fd

Please sign in to comment.