@@ -734,9 +734,9 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
734734
735735 private long lastAlertAt = this .lastReceive ;
736736
737- private long nackSleep = -1 ;
737+ private long nackSleepDurationMillis = -1 ;
738738
739- private long nackWake ;
739+ private long nackWakeTimeMillis ;
740740
741741 private int nackIndex ;
742742
@@ -1637,9 +1637,9 @@ private void doPauseConsumerIfNecessary() {
16371637 }
16381638
16391639 private void resumeConsumerIfNeccessary () {
1640- if (this .nackWake > 0 ) {
1641- if (System .currentTimeMillis () > this .nackWake ) {
1642- this .nackWake = 0 ;
1640+ if (this .nackWakeTimeMillis > 0 ) {
1641+ if (System .currentTimeMillis () > this .nackWakeTimeMillis ) {
1642+ this .nackWakeTimeMillis = 0 ;
16431643 this .consumer .resume (this .pausedForNack );
16441644 this .logger .debug (() -> "Resumed after nack sleep: " + this .pausedForNack );
16451645 this .pausedForNack .clear ();
@@ -2237,7 +2237,7 @@ private void invokeBatchOnMessage(final ConsumerRecords<K, V> records, // NOSONA
22372237
22382238 invokeBatchOnMessageWithRecordsOrList (records , recordList );
22392239 List <ConsumerRecord <?, ?>> toSeek = null ;
2240- if (this .nackSleep >= 0 ) {
2240+ if (this .nackSleepDurationMillis >= 0 ) {
22412241 int index = 0 ;
22422242 toSeek = new ArrayList <>();
22432243 for (ConsumerRecord <K , V > record : records ) {
@@ -2247,7 +2247,7 @@ private void invokeBatchOnMessage(final ConsumerRecords<K, V> records, // NOSONA
22472247 }
22482248 }
22492249 if (this .producer != null || (!this .isAnyManualAck && !this .autoCommit )) {
2250- if (this .nackSleep < 0 ) {
2250+ if (this .nackSleepDurationMillis < 0 ) {
22512251 for (ConsumerRecord <K , V > record : getHighestOffsetRecords (records )) {
22522252 this .acks .put (record );
22532253 }
@@ -2391,7 +2391,7 @@ private void invokeRecordListenerInTx(final ConsumerRecords<K, V> records) {
23912391 if (this .commonRecordInterceptor != null ) {
23922392 this .commonRecordInterceptor .afterRecord (record , this .consumer );
23932393 }
2394- if (this .nackSleep >= 0 ) {
2394+ if (this .nackSleepDurationMillis >= 0 ) {
23952395 handleNack (records , record );
23962396 break ;
23972397 }
@@ -2475,7 +2475,7 @@ private void doInvokeWithRecords(final ConsumerRecords<K, V> records) {
24752475 if (this .commonRecordInterceptor != null ) {
24762476 this .commonRecordInterceptor .afterRecord (record , this .consumer );
24772477 }
2478- if (this .nackSleep >= 0 ) {
2478+ if (this .nackSleepDurationMillis >= 0 ) {
24792479 handleNack (records , record );
24802480 break ;
24812481 }
@@ -2551,8 +2551,8 @@ private boolean recordsEqual(ConsumerRecord<K, V> rec1, ConsumerRecord<K, V> rec
25512551 }
25522552
25532553 private void pauseForNackSleep () {
2554- if (this .nackSleep > 0 ) {
2555- this .nackWake = System .currentTimeMillis () + this .nackSleep ;
2554+ if (this .nackSleepDurationMillis > 0 ) {
2555+ this .nackWakeTimeMillis = System .currentTimeMillis () + this .nackSleepDurationMillis ;
25562556 Set <TopicPartition > alreadyPaused = this .consumer .paused ();
25572557 Collection <TopicPartition > assigned = getAssignedPartitions ();
25582558 if (assigned != null ) {
@@ -2572,7 +2572,7 @@ private void pauseForNackSleep() {
25722572 this .consumer .resume (nowPaused );
25732573 }
25742574 }
2575- this .nackSleep = -1 ;
2575+ this .nackSleepDurationMillis = -1 ;
25762576 }
25772577
25782578 /**
@@ -2667,7 +2667,7 @@ private void invokeOnMessage(final ConsumerRecord<K, V> record) {
26672667 checkDeser (record , SerializationUtils .KEY_DESERIALIZER_EXCEPTION_HEADER );
26682668 }
26692669 doInvokeOnMessage (record );
2670- if (this .nackSleep < 0 && !this .isManualImmediateAck ) {
2670+ if (this .nackSleepDurationMillis < 0 && !this .isManualImmediateAck ) {
26712671 ackCurrent (record );
26722672 }
26732673 }
@@ -3240,11 +3240,11 @@ public void acknowledge() {
32403240 }
32413241
32423242 @ Override
3243- public void nack (long sleep ) {
3243+ public void nack (long sleepMillis ) {
32443244 Assert .state (Thread .currentThread ().equals (ListenerConsumer .this .consumerThread ),
32453245 "nack() can only be called on the consumer thread" );
3246- Assert .isTrue (sleep >= 0 , "sleep cannot be negative" );
3247- ListenerConsumer .this .nackSleep = sleep ;
3246+ Assert .isTrue (sleepMillis >= 0 , "sleepMillis cannot be negative" );
3247+ ListenerConsumer .this .nackSleepDurationMillis = sleepMillis ;
32483248 synchronized (ListenerConsumer .this ) {
32493249 if (ListenerConsumer .this .offsetsInThisBatch != null ) {
32503250 ListenerConsumer .this .offsetsInThisBatch .forEach ((part , recs ) -> recs .clear ());
@@ -3288,13 +3288,13 @@ public void acknowledge() {
32883288 }
32893289
32903290 @ Override
3291- public void nack (int index , long sleep ) {
3291+ public void nack (int index , long sleepMillis ) {
32923292 Assert .state (Thread .currentThread ().equals (ListenerConsumer .this .consumerThread ),
32933293 "nack() can only be called on the consumer thread" );
3294- Assert .isTrue (sleep >= 0 , "sleep cannot be negative" );
3294+ Assert .isTrue (sleepMillis >= 0 , "sleepMillis cannot be negative" );
32953295 Assert .isTrue (index >= 0 && index < this .records .count (), "index out of bounds" );
32963296 ListenerConsumer .this .nackIndex = index ;
3297- ListenerConsumer .this .nackSleep = sleep ;
3297+ ListenerConsumer .this .nackSleepDurationMillis = sleepMillis ;
32983298 synchronized (ListenerConsumer .this ) {
32993299 if (ListenerConsumer .this .offsetsInThisBatch != null ) {
33003300 ListenerConsumer .this .offsetsInThisBatch .forEach ((part , recs ) -> recs .clear ());
0 commit comments