diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 6364135b12..71b4c3f99c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -3080,6 +3080,9 @@ else if (!this.autoCommit && (!this.isAnyManualAck || commitRecovered)) { } private void sendOffsetsToTransaction() { + if (this.kafkaTxManager == null || TransactionSynchronizationManager.getResource(this.kafkaTxManager.getProducerFactory()) == null) { + return; + } handleAcks(); Map commits = buildCommits(); this.commitLogger.log(() -> "Sending offsets to transaction: " + commits); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java index 40eb235037..92f175b39b 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java @@ -47,6 +47,7 @@ import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.header.internals.RecordHeaders; +import org.jspecify.annotations.Nullable; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; @@ -75,6 +76,9 @@ import org.springframework.kafka.transaction.KafkaTransactionManager; import org.springframework.messaging.MessageHeaders; import org.springframework.transaction.TransactionDefinition; +import org.springframework.transaction.TransactionExecution; +import org.springframework.transaction.TransactionExecutionListener; +import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.support.DefaultTransactionDefinition; import org.springframework.util.backoff.FixedBackOff; @@ -140,6 +144,8 @@ public class TransactionalContainerTests { public static final String topic10 = "txTopic10"; + public static final String topic11 = "txTopic11"; + private static EmbeddedKafkaBroker embeddedKafka; @BeforeAll @@ -1148,4 +1154,82 @@ public void onMessage(List> data) { container.stop(); } + @Test + void testSendOffsetOnlyOnActiveTransaction() throws InterruptedException { + // init producer + Map producerProperties = KafkaTestUtils.producerProps(embeddedKafka); + DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(producerProperties); + pf.setTransactionIdPrefix("testSendOffsetOnlyOnActiveTransaction.recordListener"); + final KafkaTemplate template = new KafkaTemplate<>(pf); + + // init consumer + String group = "testSendOffsetOnlyOnActiveTransaction"; + Map consumerProperties = KafkaTestUtils.consumerProps(embeddedKafka, group, false); + consumerProperties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); + DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProperties); + ContainerProperties containerProps = new ContainerProperties(topic11); + containerProps.setPollTimeout(10_000); + containerProps.setMessageListener(new MessageListener() { + @Transactional("testSendOffsetOnlyOnActiveTransaction") + @Override + public void onMessage(ConsumerRecord data) { + } + }); + + // init container + KafkaTransactionManager tm = new KafkaTransactionManager<>(pf); + AtomicInteger txCount = new AtomicInteger(0); + tm.addListener(new TransactionExecutionListener() { + @Override + public void afterCommit(TransactionExecution transaction, @Nullable Throwable commitFailure) { + txCount.incrementAndGet(); + TransactionExecutionListener.super.afterCommit(transaction, commitFailure); + } + }); + containerProps.setKafkaAwareTransactionManager(tm); + + KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProps); + container.setBeanName("testSendOffsetOnlyOnActiveTransaction"); + final var interceptorLatch = new AtomicReference<>(new CountDownLatch(1)); + container.setRecordInterceptor(new RecordInterceptor() { + boolean isFirst = true; + + @Override + public @Nullable ConsumerRecord intercept( + ConsumerRecord record, + Consumer consumer) { + if (isFirst) { + isFirst = false; + return record; + } + return null; + } + + @Override + public void afterRecord( + ConsumerRecord record, + Consumer consumer) { + interceptorLatch.get().countDown(); + } + }); + container.start(); + + template.executeInTransaction(t -> { + template.send(new ProducerRecord<>(topic11, 0, 0, "bar1")); + return null; + }); + assertThat(interceptorLatch.get().await(30, TimeUnit.SECONDS)).isTrue(); + assertThat(txCount.get()).isEqualTo(1); + + interceptorLatch.set(new CountDownLatch(1)); + template.executeInTransaction(t -> { + template.send(new ProducerRecord<>(topic11, 0, 0, "bar2")); + return null; + }); + assertThat(interceptorLatch.get().await(30, TimeUnit.SECONDS)).isTrue(); + assertThat(txCount.get()).isEqualTo(1); + + container.stop(); + pf.destroy(); + } }