Skip to content

Commit d31e4d6

Browse files
garyrussellartembilan
authored andcommitted
GH-983: Seek remaining records
Resolves #983 When skipping non-retryable exceptions, we still need to seek the remaining topic/partitions.
1 parent 495e53b commit d31e4d6

File tree

2 files changed

+34
-15
lines changed

2 files changed

+34
-15
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/SeekToCurrentErrorHandler.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.List;
2222
import java.util.Map;
2323
import java.util.function.BiConsumer;
24+
import java.util.function.BiPredicate;
2425

2526
import org.apache.commons.logging.Log;
2627
import org.apache.commons.logging.LogFactory;
@@ -53,6 +54,8 @@
5354
*/
5455
public class SeekToCurrentErrorHandler implements ContainerAwareErrorHandler {
5556

57+
private static final BiPredicate<ConsumerRecord<?, ?>, Exception> ALWAYS_SKIP_PREDICATE = (r, e) -> true;
58+
5659
protected static final Log LOGGER = LogFactory.getLog(SeekToCurrentErrorHandler.class); // NOSONAR visibility
5760

5861
private static final LoggingCommitCallback LOGGING_COMMIT_CALLBACK = new LoggingCommitCallback();
@@ -101,9 +104,7 @@ public SeekToCurrentErrorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> rec
101104
* @param maxFailures the maxFailures; a negative value is treated as infinity.
102105
* @since 2.2
103106
*/
104-
public SeekToCurrentErrorHandler(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer,
105-
int maxFailures) {
106-
107+
public SeekToCurrentErrorHandler(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, int maxFailures) {
107108
this.failureTracker = new FailedRecordTracker(recoverer, maxFailures, LOGGER);
108109
this.classifier = configureDefaultClassifier();
109110
}
@@ -213,10 +214,8 @@ public boolean removeNotRetryableException(Class<? extends Exception> exceptionT
213214
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records,
214215
Consumer<?, ?> consumer, MessageListenerContainer container) {
215216

216-
if (!this.classifier.classify(thrownException)) {
217-
this.failureTracker.getRecoverer().accept(records.get(0), thrownException);
218-
}
219-
else if (!SeekUtils.doSeeks(records, consumer, thrownException, true, this.failureTracker::skip, LOGGER)) {
217+
if (!SeekUtils.doSeeks(records, consumer, thrownException, true, getSkipPredicate(records, thrownException),
218+
LOGGER)) {
220219
throw new KafkaException("Seek to current after exception", thrownException);
221220
}
222221
if (this.commitRecovered) {
@@ -242,6 +241,16 @@ else if (!SeekUtils.doSeeks(records, consumer, thrownException, true, this.failu
242241
}
243242
}
244243

244+
private BiPredicate<ConsumerRecord<?, ?>, Exception> getSkipPredicate(List<ConsumerRecord<?, ?>> records, Exception thrownException) {
245+
if (this.classifier.classify(thrownException)) {
246+
return this.failureTracker::skip;
247+
}
248+
else {
249+
this.failureTracker.getRecoverer().accept(records.get(0), thrownException);
250+
return ALWAYS_SKIP_PREDICATE;
251+
}
252+
}
253+
245254
@Override
246255
public void clearThreadState() {
247256
this.failureTracker.clearThreadState();

spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentErrorHandlerTests.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,19 @@
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
21+
import static org.mockito.Mockito.inOrder;
2122
import static org.mockito.Mockito.mock;
23+
import static org.mockito.Mockito.times;
2224

23-
import java.util.Collections;
25+
import java.util.Arrays;
2426
import java.util.List;
2527
import java.util.concurrent.atomic.AtomicReference;
2628

2729
import org.apache.kafka.clients.consumer.Consumer;
2830
import org.apache.kafka.clients.consumer.ConsumerRecord;
31+
import org.apache.kafka.common.TopicPartition;
2932
import org.junit.jupiter.api.Test;
33+
import org.mockito.InOrder;
3034

3135
import org.springframework.kafka.KafkaException;
3236
import org.springframework.kafka.support.serializer.DeserializationException;
@@ -42,19 +46,25 @@ public class SeekToCurrentErrorHandlerTests {
4246
public void testClassifier() {
4347
AtomicReference<ConsumerRecord<?, ?>> recovered = new AtomicReference<>();
4448
SeekToCurrentErrorHandler handler = new SeekToCurrentErrorHandler((r, t) -> recovered.set(r));
45-
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 0, 0L, "foo", "bar");
46-
List<ConsumerRecord<?, ?>> records = Collections.singletonList(record);
49+
ConsumerRecord<String, String> record1 = new ConsumerRecord<>("foo", 0, 0L, "foo", "bar");
50+
ConsumerRecord<String, String> record2 = new ConsumerRecord<>("foo", 0, 1L, "foo", "bar");
51+
List<ConsumerRecord<?, ?>> records = Arrays.asList(record1, record2);
4752
IllegalStateException illegalState = new IllegalStateException();
53+
Consumer<?, ?> consumer = mock(Consumer.class);
4854
assertThatExceptionOfType(KafkaException.class).isThrownBy(() -> handler.handle(illegalState, records,
49-
mock(Consumer.class), mock(MessageListenerContainer.class)))
55+
consumer, mock(MessageListenerContainer.class)))
5056
.withCause(illegalState);
5157
handler.handle(new DeserializationException("intended", null, false, illegalState), records,
52-
mock(Consumer.class), mock(MessageListenerContainer.class));
53-
assertThat(recovered.get()).isSameAs(record);
58+
consumer, mock(MessageListenerContainer.class));
59+
assertThat(recovered.get()).isSameAs(record1);
5460
handler.addNotRetryableException(IllegalStateException.class);
5561
recovered.set(null);
56-
handler.handle(illegalState, records, mock(Consumer.class), mock(MessageListenerContainer.class));
57-
assertThat(recovered.get()).isSameAs(record);
62+
handler.handle(illegalState, records, consumer, mock(MessageListenerContainer.class));
63+
assertThat(recovered.get()).isSameAs(record1);
64+
InOrder inOrder = inOrder(consumer);
65+
inOrder.verify(consumer).seek(new TopicPartition("foo", 0), 0L); // not recovered so seek
66+
inOrder.verify(consumer, times(2)).seek(new TopicPartition("foo", 0), 1L); // 2x recovered seek next
67+
inOrder.verifyNoMoreInteractions();
5868
}
5969

6070
}

0 commit comments

Comments
 (0)