Skip to content

Commit 5168bb3

Browse files
authored
GH-2596: Fix No Seek Error Handling
Resolves #2596 - Do not pause and poll when an error handler "handles" an error - this was a bug in that the remaining records were incorrectly built after the error handler indicated that it had handled the error - Reduce the poll timeout while the consumer is paused because we won't receive any records anyway **cherry-pick to 2.9.x** * Fix new test class name. * Fix test.
1 parent 0cad2d7 commit 5168bb3

11 files changed

+302
-39
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/CommonLoggingErrorHandler.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021-2022 the original author or authors.
2+
* Copyright 2021-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -47,12 +47,13 @@ public void setAckAfterHandle(boolean ackAfterHandle) {
4747
this.ackAfterHandle = ackAfterHandle;
4848
}
4949

50+
5051
@Override
51-
@Deprecated(since = "2.9", forRemoval = true) // in 3.1
52-
public void handleRecord(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer,
52+
public boolean handleOne(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer,
5353
MessageListenerContainer container) {
5454

5555
LOGGER.error(thrownException, () -> "Error occured while processing: " + KafkaUtils.format(record));
56+
return true;
5657
}
5758

5859
@Override

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2022 the original author or authors.
2+
* Copyright 2016-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -175,6 +175,10 @@ public enum EOSMode {
175175

176176
private static final double DEFAULT_IDLE_BEFORE_DATA_MULTIPLIER = 5.0;
177177

178+
private static final long ONE_HUNDRED = 100L;
179+
180+
private static final Duration DEFAULT_PAUSED_POLL_TIMEOUT = Duration.ofMillis(ONE_HUNDRED);
181+
178182
private final Map<String, String> micrometerTags = new HashMap<>();
179183

180184
private final List<Advice> adviceChain = new ArrayList<>();
@@ -188,7 +192,7 @@ public enum EOSMode {
188192
* when they all have been processed by the listener</li>
189193
* <li>TIME: Commit pending offsets after {@link #setAckTime(long) ackTime} number of
190194
* milliseconds; (should be greater than
191-
* {@code #setPollTimeout(long) pollTimeout}.</li>
195+
* {@code ConsumerProperties#setPollTimeout(long) pollTimeout}.</li>
192196
* <li>COUNT: Commit pending offsets after at least {@link #setAckCount(int) ackCount}
193197
* number of records have been processed</li>
194198
* <li>COUNT_TIME: Commit pending offsets after {@link #setAckTime(long) ackTime}
@@ -287,6 +291,8 @@ public enum EOSMode {
287291

288292
private KafkaListenerObservationConvention observationConvention;
289293

294+
private Duration pollTimeoutWhilePaused = DEFAULT_PAUSED_POLL_TIMEOUT;
295+
290296
/**
291297
* Create properties for a container that will subscribe to the specified topics.
292298
* @param topics the topics.
@@ -336,7 +342,7 @@ public void setMessageListener(Object messageListener) {
336342
* when they all have been processed by the listener</li>
337343
* <li>TIME: Commit pending offsets after {@link #setAckTime(long) ackTime} number of
338344
* milliseconds; (should be greater than
339-
* {@code #setPollTimeout(long) pollTimeout}.</li>
345+
* {@code ConsumerProperties#setPollTimeout(long) pollTimeout}.</li>
340346
* <li>COUNT: Commit pending offsets after at least {@link #setAckCount(int) ackCount}
341347
* number of records have been processed</li>
342348
* <li>COUNT_TIME: Commit pending offsets after {@link #setAckTime(long) ackTime}
@@ -568,11 +574,10 @@ public float getNoPollThreshold() {
568574
}
569575

570576
/**
571-
* If the time since the last poll / {@link #getPollTimeout() poll timeout}
572-
* exceeds this value, a NonResponsiveConsumerEvent is published.
573-
* This value should be more than 1.0 to avoid a race condition that can cause
574-
* spurious events to be published.
575-
* Default {@value #DEFAULT_NO_POLL_THRESHOLD}.
577+
* If the time since the last poll / {@link ConsumerProperties#getPollTimeout() poll
578+
* timeout} exceeds this value, a NonResponsiveConsumerEvent is published. This value
579+
* should be more than 1.0 to avoid a race condition that can cause spurious events to
580+
* be published. Default {@value #DEFAULT_NO_POLL_THRESHOLD}.
576581
* @param noPollThreshold the threshold
577582
* @since 1.3.1
578583
*/
@@ -945,6 +950,28 @@ public void setObservationConvention(KafkaListenerObservationConvention observat
945950
this.observationConvention = observationConvention;
946951
}
947952

953+
/**
954+
* The poll timeout to use while paused; usually a lower number than
955+
* {@link ConsumerProperties#setPollTimeout(long) pollTimeout}.
956+
* @return the pollTimeoutWhilePaused
957+
* @since 2.9.7
958+
*/
959+
public Duration getPollTimeoutWhilePaused() {
960+
return this.pollTimeoutWhilePaused;
961+
}
962+
963+
/**
964+
* Set the poll timeout to use while paused; usually a lower number than
965+
* {@link ConsumerProperties#setPollTimeout(long) pollTimeout}. Should be greater than
966+
* zero to avoid a tight CPU loop while the consumer is paused. Default is 100ms.
967+
* @param pollTimeoutWhilePaused the pollTimeoutWhilePaused to set
968+
* @since 2.9.7
969+
*/
970+
public void setPollTimeoutWhilePaused(Duration pollTimeoutWhilePaused) {
971+
Assert.notNull(pollTimeoutWhilePaused, "'pollTimeoutWhilePaused' cannot be null");
972+
this.pollTimeoutWhilePaused = pollTimeoutWhilePaused;
973+
}
974+
948975
@Override
949976
public String toString() {
950977
return "ContainerProperties ["
@@ -967,6 +994,7 @@ public String toString() {
967994
+ "\n monitorInterval=" + this.monitorInterval
968995
+ (this.scheduler != null ? "\n scheduler=" + this.scheduler : "")
969996
+ "\n noPollThreshold=" + this.noPollThreshold
997+
+ "\n pollTimeoutWhilePaused=" + this.pollTimeoutWhilePaused
970998
+ "\n subBatchPerPartition=" + this.subBatchPerPartition
971999
+ "\n assignmentCommitOption=" + this.assignmentCommitOption
9721000
+ "\n deliveryAttemptHeader=" + this.deliveryAttemptHeader

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -692,6 +692,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
692692

693693
private final Duration pollTimeout = Duration.ofMillis(this.containerProperties.getPollTimeout());
694694

695+
private final Duration pollTimeoutWhilePaused = this.containerProperties.getPollTimeoutWhilePaused();
696+
695697
private final boolean checkNullKeyForExceptions;
696698

697699
private final boolean checkNullValueForExceptions;
@@ -1692,7 +1694,7 @@ private ConsumerRecords<K, V> doPoll() {
16921694
private ConsumerRecords<K, V> pollConsumer() {
16931695
beforePoll();
16941696
try {
1695-
return this.consumer.poll(this.pollTimeout);
1697+
return this.consumer.poll(this.consumerPaused ? this.pollTimeoutWhilePaused : this.pollTimeout);
16961698
}
16971699
catch (WakeupException ex) {
16981700
return ConsumerRecords.empty();
@@ -2915,17 +2917,23 @@ private void invokeErrorHandler(final ConsumerRecord<K, V> cRecord,
29152917
KafkaMessageListenerContainer.this.thisOrParentContainer);
29162918
}
29172919
else {
2918-
boolean handled = this.commonErrorHandler.handleOne(rte, cRecord, this.consumer,
2919-
KafkaMessageListenerContainer.this.thisOrParentContainer);
2920+
boolean handled = false;
2921+
try {
2922+
handled = this.commonErrorHandler.handleOne(rte, cRecord, this.consumer,
2923+
KafkaMessageListenerContainer.this.thisOrParentContainer);
2924+
}
2925+
catch (Exception ex) {
2926+
this.logger.error(ex, "ErrorHandler threw unexpected exception");
2927+
}
29202928
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new LinkedHashMap<>();
29212929
if (!handled) {
29222930
records.computeIfAbsent(new TopicPartition(cRecord.topic(), cRecord.partition()),
29232931
tp -> new ArrayList<ConsumerRecord<K, V>>()).add(cRecord);
2924-
}
2925-
while (iterator.hasNext()) {
2926-
ConsumerRecord<K, V> next = iterator.next();
2927-
records.computeIfAbsent(new TopicPartition(next.topic(), next.partition()),
2928-
tp -> new ArrayList<ConsumerRecord<K, V>>()).add(next);
2932+
while (iterator.hasNext()) {
2933+
ConsumerRecord<K, V> next = iterator.next();
2934+
records.computeIfAbsent(new TopicPartition(next.topic(), next.partition()),
2935+
tp -> new ArrayList<ConsumerRecord<K, V>>()).add(next);
2936+
}
29292937
}
29302938
if (!records.isEmpty()) {
29312939
this.remainingRecords = new ConsumerRecords<>(records);

spring-kafka/src/test/java/org/springframework/kafka/listener/ContainerPauseImmediateTests.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022 the original author or authors.
2+
* Copyright 2022-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -89,7 +89,7 @@ public void pausesImmediately() throws Exception {
8989
inOrder.verify(this.consumer).assign(any(Collection.class));
9090
inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
9191
inOrder.verify(this.consumer).pause(any());
92-
inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
92+
inOrder.verify(this.consumer).poll(Duration.ZERO);
9393
assertThat(this.config.count).isEqualTo(4);
9494
assertThat(this.config.contents).contains("foo", "bar", "baz", "qux");
9595
this.registry.getListenerContainer("id").resume();
@@ -178,7 +178,7 @@ public Consumer consumer(KafkaListenerEndpointRegistry registry) {
178178
}
179179
return new ConsumerRecords(Collections.emptyMap());
180180
}
181-
}).given(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
181+
}).given(consumer).poll(any());
182182
List<TopicPartition> paused = new ArrayList<>();
183183
willAnswer(i -> {
184184
this.commitLatch.countDown();
@@ -209,6 +209,7 @@ ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(KafkaListe
209209
factory.setConsumerFactory(consumerFactory(registry));
210210
factory.getContainerProperties().setAckMode(AckMode.RECORD);
211211
factory.getContainerProperties().setPauseImmediate(true);
212+
factory.getContainerProperties().setPollTimeoutWhilePaused(Duration.ZERO);
212213
return factory;
213214
}
214215

spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerNoSeeksBatchAckTests.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022 the original author or authors.
2+
* Copyright 2022-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -101,6 +101,7 @@ public void retriesWithNoSeeksAckModeBatch() throws Exception {
101101
offsets.put(new TopicPartition("foo", 1), new OffsetAndMetadata(1L));
102102
inOrder.verify(this.consumer).commitSync(offsets, Duration.ofSeconds(60));
103103
inOrder.verify(this.consumer).pause(any());
104+
inOrder.verify(this.consumer).poll(Duration.ZERO);
104105
inOrder.verify(this.consumer).resume(any());
105106
offsets = new LinkedHashMap<>();
106107
offsets.put(new TopicPartition("foo", 1), new OffsetAndMetadata(2L));
@@ -190,7 +191,7 @@ public Consumer consumer() {
190191
}
191192
return ConsumerRecords.empty();
192193
}
193-
}).given(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
194+
}).given(consumer).poll(any());
194195
willAnswer(i -> {
195196
this.commitLatch.countDown();
196197
return null;
@@ -208,6 +209,7 @@ public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
208209
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
209210
factory.setConsumerFactory(consumerFactory());
210211
factory.getContainerProperties().setAckMode(AckMode.BATCH);
212+
factory.getContainerProperties().setPollTimeoutWhilePaused(Duration.ZERO);
211213
DefaultErrorHandler eh = new DefaultErrorHandler();
212214
eh.setSeekAfterError(false);
213215
factory.setCommonErrorHandler(eh);

spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerNoSeeksBatchListenerTests.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022 the original author or authors.
2+
* Copyright 2022-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -112,6 +112,7 @@ void retriesWithNoSeeksBatchListener() throws Exception {
112112
offsets.put(new TopicPartition("foo", 1), new OffsetAndMetadata(1L));
113113
inOrder.verify(this.consumer).commitSync(offsets, Duration.ofSeconds(60));
114114
inOrder.verify(this.consumer).pause(any());
115+
inOrder.verify(this.consumer).poll(Duration.ZERO);
115116
offsets = new LinkedHashMap<>();
116117
offsets.put(new TopicPartition("foo", 1), new OffsetAndMetadata(2L));
117118
offsets.put(new TopicPartition("foo", 2), new OffsetAndMetadata(2L));
@@ -202,7 +203,7 @@ public Consumer consumer() {
202203
}
203204
return new ConsumerRecords(Collections.emptyMap());
204205
}
205-
}).given(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
206+
}).given(consumer).poll(any());
206207
willAnswer(i -> {
207208
this.commitLatch.countDown();
208209
return null;
@@ -221,6 +222,7 @@ public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
221222
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
222223
factory.setConsumerFactory(consumerFactory());
223224
factory.setBatchListener(true);
225+
factory.getContainerProperties().setPollTimeoutWhilePaused(Duration.ZERO);
224226
DefaultErrorHandler eh = new DefaultErrorHandler() {
225227

226228
@Override

spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerNoSeeksRecordAckNoResumeContainerPausedTests.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022 the original author or authors.
2+
* Copyright 2022-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -104,7 +104,7 @@ public void doesNotResumeIfPartitionPaused() throws Exception {
104104
Collections.singletonMap(new TopicPartition("foo", 1), new OffsetAndMetadata(1L)),
105105
Duration.ofSeconds(60));
106106
inOrder.verify(this.consumer).pause(any());
107-
inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
107+
inOrder.verify(this.consumer).poll(Duration.ZERO);
108108
verify(this.consumer, never()).resume(any());
109109
assertThat(this.config.count).isEqualTo(4);
110110
assertThat(this.config.contents).contains("foo", "bar", "baz", "qux");
@@ -186,7 +186,7 @@ public Consumer consumer(KafkaListenerEndpointRegistry registry) {
186186
}
187187
return new ConsumerRecords(Collections.emptyMap());
188188
}
189-
}).given(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
189+
}).given(consumer).poll(any());
190190
List<TopicPartition> paused = new ArrayList<>();
191191
willAnswer(i -> {
192192
this.commitLatch.countDown();
@@ -217,6 +217,7 @@ ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(KafkaListe
217217
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
218218
factory.setConsumerFactory(consumerFactory(registry));
219219
factory.getContainerProperties().setAckMode(AckMode.RECORD);
220+
factory.getContainerProperties().setPollTimeoutWhilePaused(Duration.ZERO);
220221
DefaultErrorHandler eh = new DefaultErrorHandler();
221222
eh.setSeekAfterError(false);
222223
factory.setCommonErrorHandler(eh);

spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerNoSeeksRecordAckNoResumePartitionTests.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022 the original author or authors.
2+
* Copyright 2022-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -106,7 +106,7 @@ public void doesNotResumeIfPartitionPaused() throws Exception {
106106
Collections.singletonMap(new TopicPartition("foo", 1), new OffsetAndMetadata(1L)),
107107
Duration.ofSeconds(60));
108108
inOrder.verify(this.consumer).pause(any());
109-
inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
109+
inOrder.verify(this.consumer).poll(Duration.ZERO);
110110
verify(this.consumer, never()).resume(any());
111111
assertThat(this.config.count).isEqualTo(4);
112112
assertThat(this.config.contents).contains("foo", "bar", "baz", "qux");
@@ -195,7 +195,7 @@ public Consumer consumer(KafkaListenerEndpointRegistry registry) {
195195
}
196196
return new ConsumerRecords(Collections.emptyMap());
197197
}
198-
}).given(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
198+
}).given(consumer).poll(any());
199199
List<TopicPartition> paused = new ArrayList<>();
200200
willAnswer(i -> {
201201
this.commitLatch.countDown();
@@ -227,6 +227,7 @@ ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(KafkaListe
227227
factory.setConsumerFactory(consumerFactory(registry));
228228
factory.getContainerProperties().setAckMode(AckMode.RECORD);
229229
factory.getContainerProperties().setDeliveryAttemptHeader(true);
230+
factory.getContainerProperties().setPollTimeoutWhilePaused(Duration.ZERO);
230231
factory.setRecordInterceptor((record, consumer) -> {
231232
Config.this.deliveryAttempt = record.headers().lastHeader(KafkaHeaders.DELIVERY_ATTEMPT);
232233
return record;

spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerNoSeeksRecordAckTests.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022 the original author or authors.
2+
* Copyright 2022-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -112,7 +112,7 @@ public void retriesWithNoSeeksAckModeRecord() throws Exception {
112112
Collections.singletonMap(new TopicPartition("foo", 1), new OffsetAndMetadata(1L)),
113113
Duration.ofSeconds(60));
114114
inOrder.verify(this.consumer).pause(any());
115-
inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
115+
inOrder.verify(this.consumer).poll(Duration.ZERO);
116116
inOrder.verify(this.consumer).commitSync(
117117
Collections.singletonMap(new TopicPartition("foo", 1), new OffsetAndMetadata(2L)),
118118
Duration.ofSeconds(60));
@@ -243,9 +243,9 @@ public Consumer consumer() {
243243
catch (InterruptedException e) {
244244
Thread.currentThread().interrupt();
245245
}
246-
return new ConsumerRecords(Collections.emptyMap());
246+
return ConsumerRecords.empty();
247247
}
248-
}).given(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
248+
}).given(consumer).poll(any());
249249
List<TopicPartition> paused = new ArrayList<>();
250250
willAnswer(i -> {
251251
this.commitLatch.countDown();
@@ -276,6 +276,7 @@ ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
276276
factory.setConsumerFactory(consumerFactory());
277277
factory.getContainerProperties().setAckMode(AckMode.RECORD);
278278
factory.getContainerProperties().setDeliveryAttemptHeader(true);
279+
factory.getContainerProperties().setPollTimeoutWhilePaused(Duration.ZERO);
279280
factory.setRecordInterceptor((record, consumer) -> {
280281
Config.this.deliveryAttempt = record.headers().lastHeader(KafkaHeaders.DELIVERY_ATTEMPT);
281282
return record;

0 commit comments

Comments
 (0)