@@ -782,6 +782,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
782782
783783 private ConsumerRecords <K , V > pendingRecordsAfterError ;
784784
785+ private boolean pauseForPending ;
786+
785787 private volatile boolean consumerPaused ;
786788
787789 private volatile Thread consumerThread ;
@@ -1566,7 +1568,10 @@ private ConsumerRecords<K, V> doPoll() {
15661568 + "after an error; emergency stop invoked to avoid message loss" , howManyRecords ));
15671569 KafkaMessageListenerContainer .this .emergencyStop .run ();
15681570 }
1569- if (!isPartitionPaused (this .pendingRecordsAfterError .partitions ().iterator ().next ())) {
1571+ TopicPartition firstPart = this .pendingRecordsAfterError .partitions ().iterator ().next ();
1572+ boolean isPaused = isPartitionPauseRequested (firstPart );
1573+ this .logger .debug (() -> "First pending after error: " + firstPart + "; paused: " + isPaused );
1574+ if (!isPaused ) {
15701575 records = this .pendingRecordsAfterError ;
15711576 this .pendingRecordsAfterError = null ;
15721577 }
@@ -1682,10 +1687,11 @@ private void doPauseConsumerIfNecessary() {
16821687 this .logger .debug (() -> "Pausing for incomplete async acks: " + this .offsetsInThisBatch );
16831688 }
16841689 if (!this .consumerPaused && (isPaused () || this .pausedForAsyncAcks )
1685- || this .pendingRecordsAfterError != null ) {
1690+ || this .pauseForPending ) {
16861691
16871692 this .consumer .pause (this .consumer .assignment ());
16881693 this .consumerPaused = true ;
1694+ this .pauseForPending = false ;
16891695 this .logger .debug (() -> "Paused consumption from: " + this .consumer .paused ());
16901696 publishConsumerPausedEvent (this .consumer .assignment ());
16911697 }
@@ -2385,6 +2391,7 @@ private void invokeBatchErrorHandler(final ConsumerRecords<K, V> records,
23852391 () -> invokeBatchOnMessageWithRecordsOrList (records , list ));
23862392 if (!afterHandling .isEmpty ()) {
23872393 this .pendingRecordsAfterError = afterHandling ;
2394+ this .pauseForPending = true ;
23882395 }
23892396 }
23902397 }
@@ -2778,6 +2785,7 @@ private void invokeErrorHandler(final ConsumerRecord<K, V> record,
27782785 }
27792786 if (records .size () > 0 ) {
27802787 this .pendingRecordsAfterError = new ConsumerRecords <>(records );
2788+ this .pauseForPending = true ;
27812789 }
27822790 }
27832791 }
0 commit comments