Skip to content

Commit

Permalink
Add support for batch error strategy (#890)
Browse files Browse the repository at this point in the history
  • Loading branch information
guillermocalvo authored Nov 9, 2023
1 parent 6c291b2 commit 277d7c2
Show file tree
Hide file tree
Showing 6 changed files with 738 additions and 269 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
*/
package io.micronaut.configuration.kafka.exceptions;

import io.micronaut.core.annotation.Nullable;
import io.micronaut.messaging.exceptions.MessageListenerException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.util.Optional;

/**
Expand All @@ -29,9 +31,10 @@
@SuppressWarnings("WeakerAccess")
public class KafkaListenerException extends MessageListenerException {

private final Object listener;
private final Consumer<?, ?> kafkaConsumer;
private final ConsumerRecord<?, ?> consumerRecord;
private final transient Object listener;
private final transient Consumer<?, ?> kafkaConsumer;
private final transient ConsumerRecords<?, ?> consumerRecords;
private final transient ConsumerRecord<?, ?> consumerRecord;

/**
* Creates a new exception.
Expand All @@ -42,10 +45,7 @@ public class KafkaListenerException extends MessageListenerException {
* @param consumerRecord The consumer record
*/
public KafkaListenerException(String message, Object listener, Consumer<?, ?> kafkaConsumer, ConsumerRecord<?, ?> consumerRecord) {
super(message);
this.listener = listener;
this.kafkaConsumer = kafkaConsumer;
this.consumerRecord = consumerRecord;
this(message, null, listener, kafkaConsumer, consumerRecord);
}

/**
Expand All @@ -58,12 +58,9 @@ public KafkaListenerException(String message, Object listener, Consumer<?, ?> ka
* @param consumerRecord The consumer record
*/
public KafkaListenerException(String message, Throwable cause, Object listener, Consumer<?, ?> kafkaConsumer, ConsumerRecord<?, ?> consumerRecord) {
super(message, cause);
this.listener = listener;
this.kafkaConsumer = kafkaConsumer;
this.consumerRecord = consumerRecord;
this(message, cause, listener, kafkaConsumer, null, consumerRecord);
}

/**
* Creates a new exception.
*
Expand All @@ -73,9 +70,31 @@ public KafkaListenerException(String message, Throwable cause, Object listener,
* @param consumerRecord The consumer record
*/
public KafkaListenerException(Throwable cause, Object listener, Consumer<?, ?> kafkaConsumer, ConsumerRecord<?, ?> consumerRecord) {
super(cause.getMessage(), cause);
this(cause.getMessage(), cause, listener, kafkaConsumer, consumerRecord);
}

/**
* Creates a new exception.
*
* @param message The message
* @param cause The cause
* @param listener The listener
* @param kafkaConsumer The consumer
* @param consumerRecords The batch of consumer records
* @param consumerRecord The consumer record
*/
public KafkaListenerException(
String message,
Throwable cause,
Object listener,
Consumer<?, ?> kafkaConsumer,
@Nullable ConsumerRecords<?, ?> consumerRecords,
@Nullable ConsumerRecord<?, ?> consumerRecord
) {
super(message, cause);
this.listener = listener;
this.kafkaConsumer = kafkaConsumer;
this.consumerRecords = consumerRecords;
this.consumerRecord = consumerRecord;
}

Expand All @@ -89,14 +108,25 @@ public Object getKafkaListener() {
/**
* @return The consumer that produced the error
*/
@SuppressWarnings("java:S1452") // Remove usage of generic wildcard type
public Consumer<?, ?> getKafkaConsumer() {
return kafkaConsumer;
}

/**
* @return The consumer record that was being processed that caused the error
*/
@SuppressWarnings("java:S1452") // Remove usage of generic wildcard type
public Optional<ConsumerRecord<?, ?>> getConsumerRecord() {
return Optional.ofNullable(consumerRecord);
}

/**
* @return The batch of consumer records that was being processed that caused the error
* @since 5.3
*/
@SuppressWarnings("java:S1452") // Remove usage of generic wildcard type
public Optional<ConsumerRecords<?, ?>> getConsumerRecords() {
return Optional.ofNullable(consumerRecords);
}
}
Loading

0 comments on commit 277d7c2

Please sign in to comment.