diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 2014c78406..14fcd2e3aa 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -305,8 +305,7 @@ public boolean isContainerPaused() { @Override public boolean isPartitionPaused(TopicPartition topicPartition) { - return this.listenerConsumer != null && this.listenerConsumer - .isPartitionPaused(topicPartition); + return this.listenerConsumer != null && this.listenerConsumer.isPartitionPaused(topicPartition); } @Override @@ -317,33 +316,28 @@ public boolean isInExpectedState() { @Override public void enforceRebalance() { this.thisOrParentContainer.enforceRebalanceRequested.set(true); - KafkaMessageListenerContainer.ListenerConsumer consumer = this.listenerConsumer; - if (consumer != null) { - consumer.wakeIfNecessary(); - } + consumerWakeIfNecessary(); } @Override public void pause() { super.pause(); - KafkaMessageListenerContainer.ListenerConsumer consumer = this.listenerConsumer; - if (consumer != null) { - consumer.wakeIfNecessary(); - } + consumerWakeIfNecessary(); } @Override public void resume() { super.resume(); - KafkaMessageListenerContainer.ListenerConsumer consumer = this.listenerConsumer; - if (consumer != null) { - consumer.wakeIfNecessary(); - } + consumerWakeIfNecessary(); } @Override public void resumePartition(TopicPartition topicPartition) { super.resumePartition(topicPartition); + consumerWakeIfNecessary(); + } + + private void consumerWakeIfNecessary() { KafkaMessageListenerContainer.ListenerConsumer consumer = this.listenerConsumer; if (consumer != null) { consumer.wakeIfNecessary(); @@ -422,15 +416,11 @@ private void checkAckMode(ContainerProperties containerProperties) { } private ListenerType determineListenerType(GenericMessageListener listener) { - ListenerType listenerType = ListenerUtils.determineListenerType(listener); - if (listener instanceof DelegatingMessageListener) { - Object delegating = listener; - while (delegating instanceof DelegatingMessageListener dml) { - delegating = dml.getDelegate(); - } - listenerType = ListenerUtils.determineListenerType(delegating); + Object delegating = listener; + while (delegating instanceof DelegatingMessageListener dml) { + delegating = dml.getDelegate(); } - return listenerType; + return ListenerUtils.determineListenerType(delegating); } @Override @@ -1586,7 +1576,7 @@ private void fixTxOffsetsIfNeeded() { this.lastCommits.forEach((tp, oamd) -> { long position = this.consumer.position(tp); Long saved = this.savedPositions.get(tp); - if (saved != null && saved.longValue() != position) { + if (saved != null && saved != position) { this.logger.debug(() -> "Skipping TX offset correction - seek(s) have been performed; " + "saved: " + this.savedPositions + ", " + "committed: " + oamd + ", " @@ -1609,9 +1599,7 @@ private void fixTxOffsetsIfNeeded() { } else { this.transactionTemplate.executeWithoutResult(status -> { - doSendOffsets(((KafkaResourceHolder) TransactionSynchronizationManager - .getResource(this.kafkaTxManager.getProducerFactory())) - .getProducer(), toFix); + doSendOffsets(getTxProducer(), toFix); }); } } @@ -2195,9 +2183,7 @@ private void invokeBatchListenerInTx(final ConsumerRecords records, @Override public void doInTransactionWithoutResult(TransactionStatus s) { if (ListenerConsumer.this.kafkaTxManager != null) { - ListenerConsumer.this.producer = ((KafkaResourceHolder) TransactionSynchronizationManager - .getResource(ListenerConsumer.this.kafkaTxManager.getProducerFactory())) - .getProducer(); // NOSONAR nullable + ListenerConsumer.this.producer = getTxProducer(); } RuntimeException aborted = doInvokeBatchListener(records, recordList); if (aborted != null) { @@ -2516,7 +2502,6 @@ private void invokeRecordListener(final ConsumerRecords records) { * Invoke the listener with each record in a separate transaction. * @param records the records. */ - @SuppressWarnings(RAWTYPES) // NOSONAR complexity private void invokeRecordListenerInTx(final ConsumerRecords records) { Iterator> iterator = records.iterator(); while (iterator.hasNext()) { @@ -2561,9 +2546,7 @@ private void invokeInTransaction(Iterator> iterator, final @Override public void doInTransactionWithoutResult(TransactionStatus s) { if (ListenerConsumer.this.kafkaTxManager != null) { - ListenerConsumer.this.producer = ((KafkaResourceHolder) TransactionSynchronizationManager - .getResource(ListenerConsumer.this.kafkaTxManager.getProducerFactory())) - .getProducer(); // NOSONAR + ListenerConsumer.this.producer = getTxProducer(); } RuntimeException aborted = doInvokeRecordListener(cRecord, iterator); if (aborted != null) { @@ -2755,6 +2738,13 @@ private void pauseForNackSleep() { this.nackSleepDurationMillis = -1; } + @SuppressWarnings(RAWTYPES) + private Producer getTxProducer() { + return ((KafkaResourceHolder) TransactionSynchronizationManager + .getResource(ListenerConsumer.this.kafkaTxManager.getProducerFactory())) + .getProducer(); // NOSONAR + } + /** * Actually invoke the listener. * @param cRecord the record. @@ -3884,20 +3874,13 @@ private Long computeBackwardWhereTo(long offset, boolean toCurrent, TopicPartiti } - private static final class OffsetMetadata { - - final Long offset; // NOSONAR - - final boolean relativeToCurrent; // NOSONAR - - final SeekPosition seekPosition; // NOSONAR - - OffsetMetadata(Long offset, boolean relativeToCurrent, SeekPosition seekPosition) { - this.offset = offset; - this.relativeToCurrent = relativeToCurrent; - this.seekPosition = seekPosition; - } - + /** + * Offset metadata record. + * @param offset current offset. + * @param relativeToCurrent relative to current. + * @param seekPosition seek position strategy. + */ + private record OffsetMetadata(Long offset, boolean relativeToCurrent, SeekPosition seekPosition) { } private class StopCallback implements BiConsumer {