Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow reactive consumer methods with error strategy #977

Merged
merged 1 commit into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,7 @@ protected void handleResultFlux(
boolean isBlocking
) {
final Flux<RecordMetadata> recordMetadataProducer = publisher
.flatMap(value -> sendToDestination(value, consumerRecord, consumerRecords))
.onErrorResume(error -> handleSendToError(error, consumerRecords, consumerRecord));
.flatMap(value -> sendToDestination(value, consumerRecord, consumerRecords));

if (isBlocking) {
List<RecordMetadata> listRecords = recordMetadataProducer.collectList().block();
Expand Down Expand Up @@ -298,7 +297,8 @@ private Publisher<RecordMetadata> sendToDestination(Object value, ConsumerRecord
value.getClass()
);
}
return Flux.create(emitter -> sendToDestination(emitter, kafkaProducer, key, value, consumerRecord, consumerRecords));
Flux<RecordMetadata> result = Flux.create(emitter -> sendToDestination(emitter, kafkaProducer, key, value, consumerRecord, consumerRecords));
return result.onErrorResume(error -> handleSendToError(error, consumerRecords, consumerRecord));
}

private void sendToDestination(FluxSink<RecordMetadata> emitter, Producer<?, ?> kafkaProducer, Object key, Object value, ConsumerRecord<?, ?> consumerRecord, ConsumerRecords<?, ?> consumerRecords) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ import io.micronaut.configuration.kafka.exceptions.KafkaListenerExceptionHandler
import io.micronaut.configuration.kafka.retry.ConditionalRetryBehaviourHandler
import io.micronaut.context.annotation.Property
import io.micronaut.context.annotation.Requires
import io.micronaut.core.annotation.Blocking
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.TopicPartition
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import reactor.core.publisher.Mono
import spock.lang.Unroll

import java.util.concurrent.atomic.AtomicInteger
Expand Down Expand Up @@ -269,6 +271,28 @@ class KafkaErrorStrategySpec extends AbstractEmbeddedServerSpec {
myConsumer.errors[3].message == "Three #6"
}

void "test reactive consumer when error strategy is 'retry on error' and 'handle all exceptions' is true"() {
when: "A reactive consumer signals an error"
RetryReactiveHandleAllErrorClient myClient = context.getBean(RetryReactiveHandleAllErrorClient)
myClient.sendMessage("One")
myClient.sendMessage("Two")
myClient.sendMessage("Three")

RetryReactiveHandleAllErrorCausingConsumer myConsumer = context.getBean(RetryReactiveHandleAllErrorCausingConsumer)

then: "Messages are consumed eventually"
conditions.eventually {
myConsumer.received == ["One", "Two", "Two", "Three", "Three", "Three"]
myConsumer.count.get() == 6
}
and: "messages were retried and all exceptions were handled"
myConsumer.errors.size() == 4
myConsumer.errors[0].message == "Two #2"
myConsumer.errors[1].message == "Three #4"
myConsumer.errors[2].message == "Three #5"
myConsumer.errors[3].message == "Three #6"
}

@Unroll
void "test when error strategy is 'retry on error' with #type retry count"(String type) {
when: "A consumer throws an exception"
Expand Down Expand Up @@ -576,6 +600,33 @@ class KafkaErrorStrategySpec extends AbstractEmbeddedServerSpec {
}
}

@Requires(property = 'spec.name', value = 'KafkaErrorStrategySpec')
@KafkaListener(
offsetReset = EARLIEST,
offsetStrategy = SYNC,
errorStrategy = @ErrorStrategy(value = RETRY_ON_ERROR, retryCount = 2, handleAllExceptions = true)
)
static class RetryReactiveHandleAllErrorCausingConsumer implements KafkaListenerExceptionHandler {
AtomicInteger count = new AtomicInteger(0)
List<String> received = []
List<KafkaListenerException> errors = []

@Blocking
@Topic("errors-retry-reactive-handle-all-exceptions")
Mono<Boolean> handleMessage(String message) {
received << message
if (count.getAndIncrement() == 1 || message == 'Three') {
return Mono.error(new RuntimeException("${message} #${count}"))
}
return Mono.just(Boolean.TRUE)
}

@Override
void handle(KafkaListenerException exception) {
errors << exception
}
}

@Requires(property = 'spec.name', value = 'KafkaErrorStrategySpec')
@KafkaListener(offsetReset = EARLIEST, offsetStrategy = SYNC, errorStrategy = @ErrorStrategy(value = NONE))
static class PollNextErrorCausingConsumer implements KafkaListenerExceptionHandler {
Expand Down Expand Up @@ -852,6 +903,13 @@ class KafkaErrorStrategySpec extends AbstractEmbeddedServerSpec {
void sendMessage(String message)
}

@Requires(property = 'spec.name', value = 'KafkaErrorStrategySpec')
@KafkaClient
static interface RetryReactiveHandleAllErrorClient {
@Topic("errors-retry-reactive-handle-all-exceptions")
void sendMessage(String message)
}

@Requires(property = 'spec.name', value = 'KafkaErrorStrategySpec')
@KafkaClient
static interface PollNextErrorClient {
Expand Down
2 changes: 2 additions & 0 deletions src/main/docs/guide/kafkaListener/kafkaErrors.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ You can choose one of the error strategies:

NOTE: The error strategies apply only for non-batch messages processing.

NOTE: When using retry error strategies in combination with reactive consumer methods, it is necessary to add the `@Blocking` annotation to the reactive consumer method.

You can also make the number of retries configurable by using `retryCountValue`:

.Dynamically Configuring Retries
Expand Down
Loading