Skip to content

Commit

Permalink
Add support for batch error strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
guillermocalvo committed Oct 4, 2023
1 parent f2733e8 commit 0a92e19
Show file tree
Hide file tree
Showing 5 changed files with 464 additions and 251 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
*/
package io.micronaut.configuration.kafka.exceptions;

import io.micronaut.core.annotation.Nullable;
import io.micronaut.messaging.exceptions.MessageListenerException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.util.Optional;

/**
Expand All @@ -31,6 +33,7 @@ public class KafkaListenerException extends MessageListenerException {

private final Object listener;
private final Consumer<?, ?> kafkaConsumer;
private final ConsumerRecords<?, ?> consumerRecords;
private final ConsumerRecord<?, ?> consumerRecord;

/**
Expand All @@ -42,10 +45,7 @@ public class KafkaListenerException extends MessageListenerException {
* @param consumerRecord The consumer record
*/
public KafkaListenerException(String message, Object listener, Consumer<?, ?> kafkaConsumer, ConsumerRecord<?, ?> consumerRecord) {
super(message);
this.listener = listener;
this.kafkaConsumer = kafkaConsumer;
this.consumerRecord = consumerRecord;
this(message, null, listener, kafkaConsumer, consumerRecord);
}

/**
Expand All @@ -58,12 +58,9 @@ public KafkaListenerException(String message, Object listener, Consumer<?, ?> ka
* @param consumerRecord The consumer record
*/
public KafkaListenerException(String message, Throwable cause, Object listener, Consumer<?, ?> kafkaConsumer, ConsumerRecord<?, ?> consumerRecord) {
super(message, cause);
this.listener = listener;
this.kafkaConsumer = kafkaConsumer;
this.consumerRecord = consumerRecord;
this(message, cause, listener, kafkaConsumer, null, consumerRecord);
}

/**
* Creates a new exception.
*
Expand All @@ -73,9 +70,31 @@ public KafkaListenerException(String message, Throwable cause, Object listener,
* @param consumerRecord The consumer record
*/
public KafkaListenerException(Throwable cause, Object listener, Consumer<?, ?> kafkaConsumer, ConsumerRecord<?, ?> consumerRecord) {
super(cause.getMessage(), cause);
this(cause.getMessage(), cause, listener, kafkaConsumer, consumerRecord);
}

/**
* Creates a new exception.
*
* @param message The message
* @param cause The cause
* @param listener The listener
* @param kafkaConsumer The consumer
* @param consumerRecords The batch of consumer records
* @param consumerRecord The consumer record
*/
public KafkaListenerException(
String message,
Throwable cause,
Object listener,
Consumer<?, ?> kafkaConsumer,
@Nullable ConsumerRecords<?, ?> consumerRecords,
@Nullable ConsumerRecord<?, ?> consumerRecord
) {
super(message, cause);
this.listener = listener;
this.kafkaConsumer = kafkaConsumer;
this.consumerRecords = consumerRecords;
this.consumerRecord = consumerRecord;
}

Expand All @@ -99,4 +118,12 @@ public Object getKafkaListener() {
public Optional<ConsumerRecord<?, ?>> getConsumerRecord() {
return Optional.ofNullable(consumerRecord);
}

/**
* @return The batch of consumer records that was being processed that caused the error
* @since 5.3
*/
public Optional<ConsumerRecords<?, ?>> getConsumerRecords() {
return Optional.ofNullable(consumerRecords);
}
}
Loading

0 comments on commit 0a92e19

Please sign in to comment.