1818
1919import java .nio .ByteBuffer ;
2020import java .time .Duration ;
21- import java .util .AbstractMap .SimpleEntry ;
2221import java .util .ArrayList ;
2322import java .util .Arrays ;
2423import java .util .Collection ;
2524import java .util .Collections ;
25+ import java .util .Comparator ;
2626import java .util .HashMap ;
2727import java .util .HashSet ;
2828import java .util .Iterator ;
@@ -305,8 +305,7 @@ public boolean isContainerPaused() {
305305
306306 @ Override
307307 public boolean isPartitionPaused (TopicPartition topicPartition ) {
308- return this .listenerConsumer != null && this .listenerConsumer
309- .isPartitionPaused (topicPartition );
308+ return this .listenerConsumer != null && this .listenerConsumer .isPartitionPaused (topicPartition );
310309 }
311310
312311 @ Override
@@ -317,33 +316,28 @@ public boolean isInExpectedState() {
317316 @ Override
318317 public void enforceRebalance () {
319318 this .thisOrParentContainer .enforceRebalanceRequested .set (true );
320- KafkaMessageListenerContainer <K , V >.ListenerConsumer consumer = this .listenerConsumer ;
321- if (consumer != null ) {
322- consumer .wakeIfNecessary ();
323- }
319+ consumerWakeIfNecessary ();
324320 }
325321
326322 @ Override
327323 public void pause () {
328324 super .pause ();
329- KafkaMessageListenerContainer <K , V >.ListenerConsumer consumer = this .listenerConsumer ;
330- if (consumer != null ) {
331- consumer .wakeIfNecessary ();
332- }
325+ consumerWakeIfNecessary ();
333326 }
334327
335328 @ Override
336329 public void resume () {
337330 super .resume ();
338- KafkaMessageListenerContainer <K , V >.ListenerConsumer consumer = this .listenerConsumer ;
339- if (consumer != null ) {
340- consumer .wakeIfNecessary ();
341- }
331+ consumerWakeIfNecessary ();
342332 }
343333
344334 @ Override
345335 public void resumePartition (TopicPartition topicPartition ) {
346336 super .resumePartition (topicPartition );
337+ consumerWakeIfNecessary ();
338+ }
339+
340+ private void consumerWakeIfNecessary () {
347341 KafkaMessageListenerContainer <K , V >.ListenerConsumer consumer = this .listenerConsumer ;
348342 if (consumer != null ) {
349343 consumer .wakeIfNecessary ();
@@ -422,15 +416,11 @@ private void checkAckMode(ContainerProperties containerProperties) {
422416 }
423417
424418 private ListenerType determineListenerType (GenericMessageListener <?> listener ) {
425- ListenerType listenerType = ListenerUtils .determineListenerType (listener );
426- if (listener instanceof DelegatingMessageListener ) {
427- Object delegating = listener ;
428- while (delegating instanceof DelegatingMessageListener <?> dml ) {
429- delegating = dml .getDelegate ();
430- }
431- listenerType = ListenerUtils .determineListenerType (delegating );
419+ Object delegating = listener ;
420+ while (delegating instanceof DelegatingMessageListener <?> dml ) {
421+ delegating = dml .getDelegate ();
432422 }
433- return listenerType ;
423+ return ListenerUtils . determineListenerType ( delegating ) ;
434424 }
435425
436426 @ Override
@@ -1586,7 +1576,7 @@ private void fixTxOffsetsIfNeeded() {
15861576 this .lastCommits .forEach ((tp , oamd ) -> {
15871577 long position = this .consumer .position (tp );
15881578 Long saved = this .savedPositions .get (tp );
1589- if (saved != null && saved . longValue () != position ) {
1579+ if (saved != null && saved != position ) {
15901580 this .logger .debug (() -> "Skipping TX offset correction - seek(s) have been performed; "
15911581 + "saved: " + this .savedPositions + ", "
15921582 + "committed: " + oamd + ", "
@@ -1609,9 +1599,7 @@ private void fixTxOffsetsIfNeeded() {
16091599 }
16101600 else {
16111601 this .transactionTemplate .executeWithoutResult (status -> {
1612- doSendOffsets (((KafkaResourceHolder ) TransactionSynchronizationManager
1613- .getResource (this .kafkaTxManager .getProducerFactory ()))
1614- .getProducer (), toFix );
1602+ doSendOffsets (getTxProducer (), toFix );
16151603 });
16161604 }
16171605 }
@@ -2088,7 +2076,7 @@ private synchronized void ackInOrder(ConsumerRecord<K, V> cRecord) {
20882076 offs .remove (0 );
20892077 ConsumerRecord <K , V > recordToAck = cRecord ;
20902078 if (!deferred .isEmpty ()) {
2091- Collections .sort (deferred , ( a , b ) -> Long . compare ( a . offset (), b . offset () ));
2079+ deferred .sort (Comparator . comparingLong ( ConsumerRecord :: offset ));
20922080 while (!ObjectUtils .isEmpty (deferred ) && deferred .get (0 ).offset () == recordToAck .offset () + 1 ) {
20932081 recordToAck = deferred .remove (0 );
20942082 offs .remove (0 );
@@ -2195,9 +2183,7 @@ private void invokeBatchListenerInTx(final ConsumerRecords<K, V> records,
21952183 @ Override
21962184 public void doInTransactionWithoutResult (TransactionStatus s ) {
21972185 if (ListenerConsumer .this .kafkaTxManager != null ) {
2198- ListenerConsumer .this .producer = ((KafkaResourceHolder ) TransactionSynchronizationManager
2199- .getResource (ListenerConsumer .this .kafkaTxManager .getProducerFactory ()))
2200- .getProducer (); // NOSONAR nullable
2186+ ListenerConsumer .this .producer = getTxProducer ();
22012187 }
22022188 RuntimeException aborted = doInvokeBatchListener (records , recordList );
22032189 if (aborted != null ) {
@@ -2254,10 +2240,9 @@ protected void doInTransactionWithoutResult(TransactionStatus status) {
22542240 }
22552241
22562242 private List <ConsumerRecord <K , V >> createRecordList (final ConsumerRecords <K , V > records ) {
2257- Iterator <ConsumerRecord <K , V >> iterator = records .iterator ();
22582243 List <ConsumerRecord <K , V >> list = new LinkedList <>();
2259- while ( iterator . hasNext () ) {
2260- list .add (iterator . next () );
2244+ for ( ConsumerRecord < K , V > record : records ) {
2245+ list .add (record );
22612246 }
22622247 return list ;
22632248 }
@@ -2324,9 +2309,7 @@ private void commitOffsetsIfNeededAfterHandlingError(final ConsumerRecords<K, V>
23242309 || this .producer != null ) {
23252310 if (this .remainingRecords != null ) {
23262311 ConsumerRecord <K , V > firstUncommitted = this .remainingRecords .iterator ().next ();
2327- Iterator <ConsumerRecord <K , V >> it = records .iterator ();
2328- while (it .hasNext ()) {
2329- ConsumerRecord <K , V > next = it .next ();
2312+ for (ConsumerRecord <K , V > next : records ) {
23302313 if (!next .equals (firstUncommitted )) {
23312314 this .acks .add (next );
23322315 }
@@ -2433,7 +2416,7 @@ private void invokeBatchOnMessageWithRecordsOrList(final ConsumerRecords<K, V> r
24332416 ConsumerRecords <K , V > records = recordsArg ;
24342417 List <ConsumerRecord <K , V >> recordList = recordListArg ;
24352418 if (this .listenerinfo != null ) {
2436- records .iterator (). forEachRemaining (this ::listenerInfo );
2419+ records .forEach (this ::listenerInfo );
24372420 }
24382421 if (this .batchInterceptor != null ) {
24392422 records = this .batchInterceptor .intercept (recordsArg , this .consumer );
@@ -2516,7 +2499,6 @@ private void invokeRecordListener(final ConsumerRecords<K, V> records) {
25162499 * Invoke the listener with each record in a separate transaction.
25172500 * @param records the records.
25182501 */
2519- @ SuppressWarnings (RAWTYPES ) // NOSONAR complexity
25202502 private void invokeRecordListenerInTx (final ConsumerRecords <K , V > records ) {
25212503 Iterator <ConsumerRecord <K , V >> iterator = records .iterator ();
25222504 while (iterator .hasNext ()) {
@@ -2561,9 +2543,7 @@ private void invokeInTransaction(Iterator<ConsumerRecord<K, V>> iterator, final
25612543 @ Override
25622544 public void doInTransactionWithoutResult (TransactionStatus s ) {
25632545 if (ListenerConsumer .this .kafkaTxManager != null ) {
2564- ListenerConsumer .this .producer = ((KafkaResourceHolder ) TransactionSynchronizationManager
2565- .getResource (ListenerConsumer .this .kafkaTxManager .getProducerFactory ()))
2566- .getProducer (); // NOSONAR
2546+ ListenerConsumer .this .producer = getTxProducer ();
25672547 }
25682548 RuntimeException aborted = doInvokeRecordListener (cRecord , iterator );
25692549 if (aborted != null ) {
@@ -2579,9 +2559,7 @@ private void recordAfterRollback(Iterator<ConsumerRecord<K, V>> iterator, final
25792559
25802560 List <ConsumerRecord <K , V >> unprocessed = new ArrayList <>();
25812561 unprocessed .add (cRecord );
2582- while (iterator .hasNext ()) {
2583- unprocessed .add (iterator .next ());
2584- }
2562+ iterator .forEachRemaining (unprocessed ::add );
25852563 @ SuppressWarnings (UNCHECKED )
25862564 AfterRollbackProcessor <K , V > afterRollbackProcessorToUse =
25872565 (AfterRollbackProcessor <K , V >) getAfterRollbackProcessor ();
@@ -2639,11 +2617,10 @@ private void doInvokeWithRecords(final ConsumerRecords<K, V> records) {
26392617 private boolean checkImmediatePause (Iterator <ConsumerRecord <K , V >> iterator ) {
26402618 if (isPauseRequested () && this .pauseImmediate ) {
26412619 Map <TopicPartition , List <ConsumerRecord <K , V >>> remaining = new LinkedHashMap <>();
2642- while (iterator .hasNext ()) {
2643- ConsumerRecord <K , V > next = iterator .next ();
2620+ iterator .forEachRemaining (next -> {
26442621 remaining .computeIfAbsent (new TopicPartition (next .topic (), next .partition ()),
2645- tp -> new ArrayList <ConsumerRecord < K , V > >()).add (next );
2646- }
2622+ tp -> new ArrayList <>()).add (next );
2623+ });
26472624 if (!remaining .isEmpty ()) {
26482625 this .remainingRecords = new ConsumerRecords <>(remaining );
26492626 return true ;
@@ -2712,9 +2689,7 @@ private void handleNack(final ConsumerRecords<K, V> records, final ConsumerRecor
27122689 processCommits ();
27132690 }
27142691 List <ConsumerRecord <?, ?>> list = new ArrayList <>();
2715- Iterator <ConsumerRecord <K , V >> iterator2 = records .iterator ();
2716- while (iterator2 .hasNext ()) {
2717- ConsumerRecord <K , V > next = iterator2 .next ();
2692+ for (ConsumerRecord <K , V > next : records ) {
27182693 if (!list .isEmpty () || recordsEqual (cRecord , next )) {
27192694 list .add (next );
27202695 }
@@ -2755,6 +2730,13 @@ private void pauseForNackSleep() {
27552730 this .nackSleepDurationMillis = -1 ;
27562731 }
27572732
2733+ @ SuppressWarnings (RAWTYPES )
2734+ private Producer <?, ?> getTxProducer () {
2735+ return ((KafkaResourceHolder ) TransactionSynchronizationManager
2736+ .getResource (ListenerConsumer .this .kafkaTxManager .getProducerFactory ()))
2737+ .getProducer (); // NOSONAR
2738+ }
2739+
27582740 /**
27592741 * Actually invoke the listener.
27602742 * @param cRecord the record.
@@ -2911,9 +2893,7 @@ private void invokeErrorHandler(final ConsumerRecord<K, V> cRecord,
29112893 }
29122894 List <ConsumerRecord <?, ?>> records = new ArrayList <>();
29132895 records .add (cRecord );
2914- while (iterator .hasNext ()) {
2915- records .add (iterator .next ());
2916- }
2896+ iterator .forEachRemaining (records ::add );
29172897 this .commonErrorHandler .handleRemaining (rte , records , this .consumer ,
29182898 KafkaMessageListenerContainer .this .thisOrParentContainer );
29192899 }
@@ -2929,12 +2909,9 @@ private void invokeErrorHandler(final ConsumerRecord<K, V> cRecord,
29292909 Map <TopicPartition , List <ConsumerRecord <K , V >>> records = new LinkedHashMap <>();
29302910 if (!handled ) {
29312911 records .computeIfAbsent (new TopicPartition (cRecord .topic (), cRecord .partition ()),
2932- tp -> new ArrayList <ConsumerRecord <K , V >>()).add (cRecord );
2933- while (iterator .hasNext ()) {
2934- ConsumerRecord <K , V > next = iterator .next ();
2935- records .computeIfAbsent (new TopicPartition (next .topic (), next .partition ()),
2936- tp -> new ArrayList <ConsumerRecord <K , V >>()).add (next );
2937- }
2912+ tp -> new ArrayList <>()).add (cRecord );
2913+ iterator .forEachRemaining (next -> records .computeIfAbsent (
2914+ new TopicPartition (next .topic (), next .partition ()), tp -> new ArrayList <>()).add (next ));
29382915 }
29392916 if (!records .isEmpty ()) {
29402917 this .remainingRecords = new ConsumerRecords <>(records );
@@ -3201,9 +3178,7 @@ private void initPartitionsIfNeeded() {
32013178 doInitialSeeks (partitions , beginnings , ends );
32023179 if (this .consumerSeekAwareListener != null ) {
32033180 this .consumerSeekAwareListener .onPartitionsAssigned (this .definedPartitions .keySet ().stream ()
3204- .map (tp -> new SimpleEntry <>(tp , this .consumer .position (tp )))
3205- .collect (Collectors .toMap (entry -> entry .getKey (), entry -> entry .getValue ())),
3206- this .seekCallback );
3181+ .collect (Collectors .toMap (tp -> tp , this .consumer ::position )), this .seekCallback );
32073182 }
32083183 }
32093184
@@ -3884,20 +3859,13 @@ private Long computeBackwardWhereTo(long offset, boolean toCurrent, TopicPartiti
38843859 }
38853860
38863861
3887- private static final class OffsetMetadata {
3888-
3889- final Long offset ; // NOSONAR
3890-
3891- final boolean relativeToCurrent ; // NOSONAR
3892-
3893- final SeekPosition seekPosition ; // NOSONAR
3894-
3895- OffsetMetadata (Long offset , boolean relativeToCurrent , SeekPosition seekPosition ) {
3896- this .offset = offset ;
3897- this .relativeToCurrent = relativeToCurrent ;
3898- this .seekPosition = seekPosition ;
3899- }
3900-
3862+ /**
3863+ * Offset metadata record.
3864+ * @param offset current offset.
3865+ * @param relativeToCurrent relative to current.
3866+ * @param seekPosition seek position strategy.
3867+ */
3868+ private record OffsetMetadata (Long offset , boolean relativeToCurrent , SeekPosition seekPosition ) {
39013869 }
39023870
39033871 private class StopCallback implements BiConsumer <Object , Throwable > {
0 commit comments