Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016 the original author or authors.
* Copyright 2016-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@
package org.springframework.kafka;

import org.springframework.core.NestedRuntimeException;
import org.springframework.lang.Nullable;

/**
* The Spring Kafka specific {@link NestedRuntimeException} implementation.
Expand All @@ -31,7 +32,7 @@ public KafkaException(String message) {
super(message);
}

public KafkaException(String message, Throwable cause) {
public KafkaException(String message, @Nullable Throwable cause) {
super(message, cause);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1243,6 +1243,18 @@ private void invokeErrorHandler(final ConsumerRecord<K, V> record,
@SuppressWarnings(RAWTYPES) @Nullable Producer producer,
Iterator<ConsumerRecord<K, V>> iterator, RuntimeException e) {

Exception toHandle = e;
if (toHandle instanceof ListenerExecutionFailedException) {
toHandle = new ListenerExecutionFailedException(toHandle.getMessage(), this.consumerGroupId,
toHandle.getCause());
}
else {
/*
* TODO: in 2.3, wrap all exceptions (e.g. thrown by user implementations
* of MessageListener) in LEFE with groupId. @KafkaListeners always throw
* LEFE.
*/
}
if (this.errorHandler instanceof RemainingRecordsErrorHandler) {
if (producer == null) {
processCommits();
Expand All @@ -1252,11 +1264,11 @@ private void invokeErrorHandler(final ConsumerRecord<K, V> record,
while (iterator.hasNext()) {
records.add(iterator.next());
}
((RemainingRecordsErrorHandler) this.errorHandler).handle(e, records, this.consumer,
((RemainingRecordsErrorHandler) this.errorHandler).handle(toHandle, records, this.consumer,
KafkaMessageListenerContainer.this.container);
}
else {
this.errorHandler.handle(e, record, this.consumer);
this.errorHandler.handle(toHandle, record, this.consumer);
}
if (producer != null) {
ackCurrent(record, producer);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016 the original author or authors.
* Copyright 2016-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@
package org.springframework.kafka.listener;

import org.springframework.kafka.KafkaException;
import org.springframework.lang.Nullable;

/**
* The listener specific {@link KafkaException} extension.
Expand All @@ -26,12 +27,46 @@
@SuppressWarnings("serial")
public class ListenerExecutionFailedException extends KafkaException {

private final String groupId;

/**
* Construct an instance with the provided properties.
* @param message the exception message.
*/
public ListenerExecutionFailedException(String message) {
super(message);
this(message, null, null);
}

/**
* Construct an instance with the provided properties.
* @param message the exception message.
* @param cause the cause.
*/
public ListenerExecutionFailedException(String message, @Nullable Throwable cause) {
this(message, null, cause);
}

public ListenerExecutionFailedException(String message, Throwable cause) {
/**
* Construct an instance with the provided properties.
* @param message the exception message.
* @param groupId the container's group.id property.
* @param cause the cause.
* @since 2.2.4
*/
public ListenerExecutionFailedException(String message, @Nullable String groupId, @Nullable Throwable cause) {
super(message, cause);
this.groupId = groupId;
}

/**
* Return the consumer group.id property of the container that threw this exception.
* @return the group id; may be null, but not when the exception is passed to an error
* handler by a listener container.
* @since 2.2.4
*/
@Nullable
public String getGroupId() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still @Nullable?
As well an appropriate message in its Javadocs

return this.groupId;
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017 the original author or authors.
* Copyright 2017-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -88,6 +88,7 @@ public SeekToCurrentErrorHandler(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exce
@Override
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records,
Consumer<?, ?> consumer, MessageListenerContainer container) {

if (!SeekUtils.doSeeks(records, consumer, thrownException, true, this.failureTracker::skip, logger)) {
throw new KafkaException("Seek to current after exception", thrownException);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018 the original author or authors.
* Copyright 2018-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -57,6 +57,7 @@ private SeekUtils() {
*/
public static boolean doSeeks(List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, Exception exception,
boolean recoverable, BiPredicate<ConsumerRecord<?, ?>, Exception> skipper, Log logger) {

Map<TopicPartition, Long> partitions = new LinkedHashMap<>();
AtomicBoolean first = new AtomicBoolean(true);
AtomicBoolean skipped = new AtomicBoolean();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,9 @@ public void testConverterBean() throws Exception {
template.send("annotated32", 0, 1, "foobar");
assertThat(this.config.listen16ErrorLatch.await(30, TimeUnit.SECONDS)).isTrue();
assertThat(this.config.listen16Exception).isNotNull();
assertThat(this.config.listen16Exception).isInstanceOf(ListenerExecutionFailedException.class);
assertThat(((ListenerExecutionFailedException) this.config.listen16Exception).getGroupId())
.isEqualTo("converter.explicitGroupId");
assertThat(this.config.listen16Message).isEqualTo("foobar");
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018 the original author or authors.
* Copyright 2018-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -72,7 +72,6 @@ public class SeekToCurrentRecovererTests {
public void testMaxFailures() throws Exception {
Map<String, Object> props = KafkaTestUtils.consumerProps("seekTestMaxFailures", "false", embeddedKafka);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
ContainerProperties containerProps = new ContainerProperties(topic1);
Expand All @@ -87,7 +86,7 @@ public void testMaxFailures() throws Exception {
containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
data.set(message.value());
if (message.offset() == 0) {
throw new RuntimeException("fail for max failures");
throw new ListenerExecutionFailedException("fail for max failures");
}
latch.countDown();
});
Expand All @@ -96,12 +95,16 @@ public void testMaxFailures() throws Exception {
new KafkaMessageListenerContainer<>(cf, containerProps);
container.setBeanName("testSeekMaxFailures");
final CountDownLatch recoverLatch = new CountDownLatch(1);
final AtomicReference<String> failedGroupId = new AtomicReference<>();
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
(r, e) -> new TopicPartition(topic1DLT, r.partition())) {

@Override
public void accept(ConsumerRecord<?, ?> record, Exception exception) {
super.accept(record, exception);
if (exception instanceof ListenerExecutionFailedException) {
failedGroupId.set(((ListenerExecutionFailedException) exception).getGroupId());
}
recoverLatch.countDown();
}

Expand All @@ -122,6 +125,7 @@ public void accept(ConsumerRecord<?, ?> record, Exception exception) {
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
assertThat(data.get()).isEqualTo("bar");
assertThat(recoverLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(failedGroupId.get()).isEqualTo("seekTestMaxFailures");
container.stop();
Consumer<Integer, String> consumer = cf.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, topic1DLT);
Expand Down
3 changes: 3 additions & 0 deletions src/reference/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2527,6 +2527,9 @@ It also, optionally, can be configured with a `BiFunction<ConsumerRecord<?, ?>,
By default, the dead-letter record is sent to a topic named `<originalTopic>.DLT` (the original topic name suffixed with `.DLT`) and to the same partition as the original record.
Therefore, when using the default resolver, the dead-letter topic must have at least as many partitions as the original topic.
If the returned `TopicPartition` has a negative partition, the partition is not set in the `ProducerRecord` and so the partition will be selected by Kafka.
Starting with version 2.2.4, any `ListenerExcutionFailedException` (e.g. thrown when an exception is detected in a `@KafkaListener` method) will be enhanced with the `groupId` property.
This will allow the destination resolver to use this in addition to the information in the `ConsumerRecord` to select the dead letter topic.

The following is an example of wiring a custom destination resolver.

====
Expand Down
2 changes: 2 additions & 0 deletions src/reference/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ See <<batch-listeners>> for more information.
The `DefaultAfterRollbackProcessor` and `SeekToCurrentErrorHandler` can now recover (skip) records that keep failing, and will do so after 10 failures, by default.
They can be configured to publish failed records to a dead-letter topic.

Starting with version 2.2.4, the consumer's group id can be used while selecting the dead letter topic name.

See <<after-rollback>>, <<seek-to-current>> and <<dead-letters>> for more information.

The `ConsumerStoppingEvent` has been added.
Expand Down