Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Catch record deserialization exceptions that may happen while polling #771

Merged
merged 2 commits into from
Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
Expand Down Expand Up @@ -458,7 +459,28 @@ private void createConsumerThreadPollLoop(final ExecutableMethod<?, ?> method,
boolean failed = true;
try {
consumerState.pauseTopicPartitions();
final ConsumerRecords<?, ?> consumerRecords = kafkaConsumer.poll(pollTimeout);
final ConsumerRecords<?, ?> consumerRecords;
// Deserialization errors can happen while polling
if (!isBatch) {
// Unless in batch mode, try to honor the configured error strategy
try {
consumerRecords = kafkaConsumer.poll(pollTimeout);
} catch (RecordDeserializationException ex) {
if (LOG.isTraceEnabled()) {
LOG.trace("Kafka consumer [{}] failed to deserialize value while polling", logMethod(method), ex);
}
final TopicPartition tp = ex.topicPartition();
// By default, seek past the record to continue consumption
consumerState.kafkaConsumer.seek(tp, ex.offset() + 1);
jeremyg484 marked this conversation as resolved.
Show resolved Hide resolved
// The error strategy and the exception handler can still decide what to do about this record
resolveWithErrorStrategy(consumerState, new ConsumerRecord<>(tp.topic(), tp.partition(), ex.offset(), null, null), ex);
// By now, it's been decided whether this record should be retried and the exception may have been handled
continue;
}
} else {
// Otherwise, propagate any errors
consumerRecords = kafkaConsumer.poll(pollTimeout);
}
consumerState.closedState = ConsumerCloseState.POLLING;
failed = true;
consumerState.resumeTopicPartitions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,22 @@ class KafkaErrorStrategySpec extends AbstractEmbeddedServerSpec {
myConsumer.times[1] - myConsumer.times[0] >= 50
}

void "test when the error strategy is 'retry on error' and there are serialization errors"() {
when:"A record cannot be deserialized"
DeserializationErrorClient myClient = context.getBean(DeserializationErrorClient)
myClient.sendText("Not an integer")
myClient.sendNumber(123)

RetryOnErrorDeserializationErrorConsumer myConsumer = context.getBean(RetryOnErrorDeserializationErrorConsumer)

then:"The message that threw the exception is eventually left behind"
conditions.eventually {
myConsumer.number == 123
}
and:"the retry error strategy is honored"
myConsumer.exceptionCount.get() == 2
sdelamo marked this conversation as resolved.
Show resolved Hide resolved
}

void "test when the error strategy is 'retry on error' and retry failed, the finished messages should be complete except the failed message"() {
when:"A client sends a lot of messages to a same topic"
RetryErrorMultiplePartitionsClient myClient = context.getBean(RetryErrorMultiplePartitionsClient)
Expand Down Expand Up @@ -298,6 +314,27 @@ class KafkaErrorStrategySpec extends AbstractEmbeddedServerSpec {
}
}

@Requires(property = 'spec.name', value = 'KafkaErrorStrategySpec')
@KafkaListener(
offsetReset = EARLIEST,
value="errors-retry-deserialization-error",
errorStrategy = @ErrorStrategy(value = RETRY_ON_ERROR, handleAllExceptions = true)
)
static class RetryOnErrorDeserializationErrorConsumer implements KafkaListenerExceptionHandler {
int number = 0
AtomicInteger exceptionCount = new AtomicInteger(0)

@Topic("deserialization-errors-retry")
void handleMessage(int number) {
this.number = number
}

@Override
void handle(KafkaListenerException exception) {
exceptionCount.getAndIncrement()
}
}

@Requires(property = 'spec.name', value = 'KafkaErrorStrategySpec')
@KafkaListener(
offsetReset = EARLIEST,
Expand Down Expand Up @@ -348,6 +385,16 @@ class KafkaErrorStrategySpec extends AbstractEmbeddedServerSpec {
void sendMessage(String message)
}

@Requires(property = 'spec.name', value = 'KafkaErrorStrategySpec')
@KafkaClient
static interface DeserializationErrorClient {
@Topic("deserialization-errors-retry")
void sendText(String text)

@Topic("deserialization-errors-retry")
void sendNumber(int number)
}

@Requires(property = 'spec.name', value = 'KafkaErrorStrategySpec')
@KafkaClient
static interface RetryErrorMultiplePartitionsClient {
Expand Down
Loading