diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java index 3280e51574..9f52d410e5 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java @@ -35,6 +35,7 @@ import org.springframework.core.task.AsyncTaskExecutor; import org.springframework.kafka.support.TopicPartitionOffset; import org.springframework.kafka.support.micrometer.KafkaListenerObservationConvention; +import org.springframework.kafka.transaction.KafkaAwareTransactionManager; import org.springframework.lang.Nullable; import org.springframework.scheduling.TaskScheduler; import org.springframework.transaction.PlatformTransactionManager; @@ -256,8 +257,11 @@ public enum EOSMode { private double idleBeforeDataMultiplier = DEFAULT_IDLE_BEFORE_DATA_MULTIPLIER; + @Deprecated(since = "3.1") private PlatformTransactionManager transactionManager; + private KafkaAwareTransactionManager kafkaAwareTransactionManager; + private boolean batchRecoverAfterRollback = false; private int monitorInterval = DEFAULT_MONITOR_INTERVAL; @@ -524,6 +528,7 @@ public Long getIdlePartitionEventInterval() { return this.idlePartitionEventInterval; } + @Deprecated(since = "3.1") @Nullable public PlatformTransactionManager getTransactionManager() { return this.transactionManager; @@ -541,10 +546,26 @@ public PlatformTransactionManager getTransactionManager() { * @since 1.3 * @see #setAckMode(AckMode) */ + @Deprecated(since = "3.1") public void setTransactionManager(@Nullable PlatformTransactionManager transactionManager) { this.transactionManager = transactionManager; } + @Nullable + public KafkaAwareTransactionManager getKafkaAwareTransactionManager() { + return this.kafkaAwareTransactionManager; + } + + /** + * Set the transaction manager to start a transaction; replace {@link #setTransactionManager}. + * @param kafkaAwareTransactionManager the transaction manager. + * @since 3.1 + */ + public void setKafkaAwareTransactionManager(@Nullable KafkaAwareTransactionManager kafkaAwareTransactionManager) { + this.kafkaAwareTransactionManager = kafkaAwareTransactionManager; + } + + public boolean isBatchRecoverAfterRollback() { return this.batchRecoverAfterRollback; } @@ -1060,6 +1081,9 @@ public String toString() { + (this.transactionManager != null ? "\n transactionManager=" + this.transactionManager : "") + + (this.kafkaAwareTransactionManager != null + ? "\n kafkaAwareTransactionManager=" + this.kafkaAwareTransactionManager + : "") + "\n monitorInterval=" + this.monitorInterval + (this.scheduler != null ? "\n scheduler=" + this.scheduler : "") + "\n noPollThreshold=" + this.noPollThreshold diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 05a9f6fa16..0cc51514ef 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -683,7 +683,11 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private final CommonErrorHandler commonErrorHandler; - private final PlatformTransactionManager transactionManager = this.containerProperties.getTransactionManager(); + @SuppressWarnings("deprecation") + private final PlatformTransactionManager transactionManager = + this.containerProperties.getKafkaAwareTransactionManager() != null ? + this.containerProperties.getKafkaAwareTransactionManager() : + this.containerProperties.getTransactionManager(); @SuppressWarnings(RAWTYPES) private final KafkaAwareTransactionManager kafkaTxManager = @@ -2993,7 +2997,6 @@ private void sendOffsetsToTransaction() { doSendOffsets(this.producer, commits); } - @SuppressWarnings("deprecation") private void doSendOffsets(Producer prod, Map commits) { prod.sendOffsetsToTransaction(commits, this.consumer.groupMetadata()); if (this.fixTxOffsets) { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java index ae12c8e6aa..9e2b9ea133 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java @@ -327,7 +327,7 @@ protected KafkaConsumer createKafkaConsumer(Map assertThat(configPassedToKafkaConsumer.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)).isEqualTo("2"); } - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "deprecation"}) @Test public void testNestedTxProducerIsCached() throws Exception { Map producerProps = KafkaTestUtils.producerProps(this.embeddedKafka); @@ -376,6 +376,55 @@ public void testNestedTxProducerIsCached() throws Exception { } } + @SuppressWarnings("unchecked") + @Test + public void testNestedTxProducerIsCachedByKWTM() throws Exception { + Map producerProps = KafkaTestUtils.producerProps(this.embeddedKafka); + DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(producerProps); + KafkaTemplate template = new KafkaTemplate<>(pf); + DefaultKafkaProducerFactory pfTx = new DefaultKafkaProducerFactory<>(producerProps); + pfTx.setTransactionIdPrefix("fooTx."); + KafkaOperations templateTx = new KafkaTemplate<>(pfTx); + Map consumerProps = KafkaTestUtils.consumerProps("txCache1Group", "false", this.embeddedKafka); + DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps); + AtomicReference> wrapped = new AtomicReference<>(); + cf.addPostProcessor(consumer -> { + ProxyFactory prox = new ProxyFactory(); + prox.setTarget(consumer); + @SuppressWarnings("unchecked") + Consumer proxy = (Consumer) prox.getProxy(); + wrapped.set(proxy); + return proxy; + }); + ContainerProperties containerProps = new ContainerProperties("txCache1"); + CountDownLatch latch = new CountDownLatch(1); + containerProps.setMessageListener((MessageListener) r -> { + templateTx.executeInTransaction(t -> t.send("txCacheSendFromListener", "bar")); + templateTx.executeInTransaction(t -> t.send("txCacheSendFromListener", "baz")); + latch.countDown(); + }); + KafkaTransactionManager tm = new KafkaTransactionManager<>(pfTx); + containerProps.setKafkaAwareTransactionManager(tm); + KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, + containerProps); + container.start(); + try { + CompletableFuture> future = template.send("txCache1", "foo"); + future.get(10, TimeUnit.SECONDS); + pf.getCache(); + assertThat(KafkaTestUtils.getPropertyValue(pf, "cache", Map.class)).hasSize(0); + assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue(); + assertThat(KafkaTestUtils.getPropertyValue(pfTx, "cache", Map.class)).hasSize(1); + assertThat(pfTx.getCache()).hasSize(1); + assertThat(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.consumer")).isSameAs(wrapped.get()); + } + finally { + container.stop(); + pf.destroy(); + pfTx.destroy(); + } + } + @SuppressWarnings({ "rawtypes", "unchecked" }) @Test void listener() { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/CommitOnAssignmentTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/CommitOnAssignmentTests.java index 7de877420a..a69198b3a5 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/CommitOnAssignmentTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/CommitOnAssignmentTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2021 the original author or authors. + * Copyright 2020-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -105,7 +105,7 @@ void testLastOnly() throws InterruptedException { assertThat(this.config.commitLatch.await(10, TimeUnit.SECONDS)).isTrue(); } - @SuppressWarnings({ "rawtypes", "unchecked" }) + @SuppressWarnings({ "rawtypes", "unchecked", "deprecation" }) @Test void testLatestOnlyTx() throws InterruptedException { ContainerProperties props = this.registry.getListenerContainer("foo").getContainerProperties(); @@ -125,7 +125,7 @@ void testLatestOnlyTx() throws InterruptedException { assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); } - @SuppressWarnings({ "rawtypes", "unchecked" }) + @SuppressWarnings({ "rawtypes", "unchecked", "deprecation" }) @Test void testLatestOnlyNoTx() throws InterruptedException { ContainerProperties props = this.registry.getListenerContainer("foo").getContainerProperties(); @@ -135,13 +135,48 @@ void testLatestOnlyNoTx() throws InterruptedException { KafkaTransactionManager tm = new KafkaTransactionManager<>(pf); Producer producer = mock(Producer.class); given(pf.createProducer(any())).willReturn(producer); - CountDownLatch latch = new CountDownLatch(1); props.setTransactionManager(tm); this.registry.start(); assertThat(this.config.commitLatch.await(10, TimeUnit.SECONDS)).isTrue(); verify(producer, never()).sendOffsetsToTransaction(any(), any(ConsumerGroupMetadata.class)); } + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + void testLatestOnlyTxByKATM() throws InterruptedException { + ContainerProperties props = this.registry.getListenerContainer("foo").getContainerProperties(); + props.setAssignmentCommitOption(AssignmentCommitOption.LATEST_ONLY); + ProducerFactory pf = mock(ProducerFactory.class, withSettings().verboseLogging()); + given(pf.transactionCapable()).willReturn(true); + KafkaTransactionManager tm = new KafkaTransactionManager<>(pf); + Producer producer = mock(Producer.class); + given(pf.createProducer(any())).willReturn(producer); + CountDownLatch latch = new CountDownLatch(1); + willAnswer(inv -> { + latch.countDown(); + return null; + }).given(producer).sendOffsetsToTransaction(any(), any(ConsumerGroupMetadata.class)); + props.setKafkaAwareTransactionManager(tm); + this.registry.start(); + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + void testLatestOnlyNoTxByKATM() throws InterruptedException { + ContainerProperties props = this.registry.getListenerContainer("foo").getContainerProperties(); + props.setAssignmentCommitOption(AssignmentCommitOption.LATEST_ONLY_NO_TX); + ProducerFactory pf = mock(ProducerFactory.class, withSettings().verboseLogging()); + given(pf.transactionCapable()).willReturn(true); + KafkaTransactionManager tm = new KafkaTransactionManager<>(pf); + Producer producer = mock(Producer.class); + given(pf.createProducer(any())).willReturn(producer); + props.setKafkaAwareTransactionManager(tm); + this.registry.start(); + assertThat(this.config.commitLatch.await(10, TimeUnit.SECONDS)).isTrue(); + verify(producer, never()).sendOffsetsToTransaction(any(), any(ConsumerGroupMetadata.class)); + } + @Configuration @EnableKafka public static class Config { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java index 2b82126b81..fcd6b2d108 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java @@ -507,7 +507,7 @@ void testBatchInterceptBeforeTx1() throws InterruptedException { testIntercept(true, null, true); } - @SuppressWarnings({ "rawtypes", "unchecked" }) + @SuppressWarnings({ "rawtypes", "unchecked", "deprecation" }) void testIntercept(boolean beforeTx, AssignmentCommitOption option, boolean batch) throws InterruptedException { ConsumerFactory consumerFactory = mock(ConsumerFactory.class); final Consumer consumer = mock(Consumer.class); @@ -662,7 +662,7 @@ public void failure(ConsumerRecords records, Exception exception, Consumer consu } @Test - @SuppressWarnings({ "rawtypes", "unchecked" }) + @SuppressWarnings({ "rawtypes", "unchecked", "deprecation" }) void testInterceptInTxNonKafkaTM() throws InterruptedException { ConsumerFactory consumerFactory = mock(ConsumerFactory.class); final Consumer consumer = mock(Consumer.class); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentOnErrorBatchModeTXTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentOnErrorBatchModeTXTests.java index cc33e586cd..6138b110d3 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentOnErrorBatchModeTXTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentOnErrorBatchModeTXTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2021 the original author or authors. + * Copyright 2017-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -240,7 +240,7 @@ public Consumer consumer() { return consumer; } - @SuppressWarnings({ "rawtypes", "unchecked" }) + @SuppressWarnings({ "rawtypes", "unchecked", "deprecation" }) @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentOnErrorRecordModeTXTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentOnErrorRecordModeTXTests.java index 96f4604200..f8f960ad02 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentOnErrorRecordModeTXTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentOnErrorRecordModeTXTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2021 the original author or authors. + * Copyright 2017-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -241,7 +241,7 @@ public Consumer consumer() { return consumer; } - @SuppressWarnings({ "rawtypes", "unchecked" }) + @SuppressWarnings({ "rawtypes", "unchecked", "deprecation" }) @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/SubBatchPerPartitionTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/SubBatchPerPartitionTests.java index fc0a781ff6..514e4eb175 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/SubBatchPerPartitionTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/SubBatchPerPartitionTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2022 the original author or authors. + * Copyright 2017-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -132,6 +132,7 @@ void withFilter() throws Exception { this.registry.stop(); } + @SuppressWarnings("deprecation") @Test void defaults() { Map props = KafkaTestUtils.consumerProps("sbpp", "false", this.broker); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/SubBatchPerPartitionTxRollbackTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/SubBatchPerPartitionTxRollbackTests.java index 1d3cd8dab7..0796d3eba2 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/SubBatchPerPartitionTxRollbackTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/SubBatchPerPartitionTxRollbackTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2022 the original author or authors. + * Copyright 2017-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -214,7 +214,7 @@ public Consumer consumer() { return consumer; } - @SuppressWarnings({ "rawtypes", "unchecked" }) + @SuppressWarnings({ "rawtypes", "unchecked", "deprecation" }) @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/SubBatchPerPartitionTxTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/SubBatchPerPartitionTxTests.java index 859bfc25be..e181e98563 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/SubBatchPerPartitionTxTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/SubBatchPerPartitionTxTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2022 the original author or authors. + * Copyright 2017-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -200,7 +200,7 @@ public Consumer consumer() { return consumer; } - @SuppressWarnings({ "rawtypes", "unchecked" }) + @SuppressWarnings({ "rawtypes", "unchecked", "deprecation" }) @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java index 3009951df6..a624401837 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java @@ -299,7 +299,7 @@ private void testConsumeAndProduceTransactionGuts(boolean handleError, AckMode a assertThat(stopEvent.get().getSource()).isSameAs(container); } - @SuppressWarnings({ "rawtypes", "unchecked" }) + @SuppressWarnings({ "rawtypes", "unchecked", "deprecation" }) @Test public void testConsumeAndProduceTransactionRollback() throws Exception { Consumer consumer = mock(Consumer.class); @@ -370,7 +370,7 @@ public void testConsumeAndProduceTransactionRollback() throws Exception { assertThat(delivery.get()).isNotNull(); } - @SuppressWarnings({ "rawtypes", "unchecked" }) + @SuppressWarnings({ "rawtypes", "unchecked", "deprecation" }) @Test public void testConsumeAndProduceTransactionRollbackBatch() throws Exception { Consumer consumer = mock(Consumer.class); @@ -619,7 +619,7 @@ public void testFixLagOtherTM() throws InterruptedException { testFixLagGuts(topic7, 2); } - @SuppressWarnings("unchecked") + @SuppressWarnings({ "unchecked", "deprecation" }) private void testFixLagGuts(String topic, int whichTm) throws InterruptedException { logger.info("Start testFixLag"); Map props = KafkaTestUtils.consumerProps("txTest2", "false", embeddedKafka); @@ -676,7 +676,7 @@ private void testFixLagGuts(String topic, int whichTm) throws InterruptedExcepti pf.destroy(); } - @SuppressWarnings({ "unchecked"}) + @SuppressWarnings({ "unchecked", "deprecation" }) @Test public void testMaxFailures() throws Exception { logger.info("Start testMaxFailures"); @@ -786,7 +786,7 @@ public void accept(ConsumerRecord record, Consumer consumer, Excepti logger.info("Stop testMaxAttempts"); } - @SuppressWarnings({ "unchecked"}) + @SuppressWarnings({ "unchecked", "deprecation" }) @Test public void testBatchListenerMaxFailuresOnRecover() throws Exception { @@ -908,7 +908,7 @@ public void accept(ConsumerRecord record, Consumer consumer, Excepti logger.info("Stop testBatchListenerMaxFailures"); } - @SuppressWarnings("unchecked") + @SuppressWarnings({ "unchecked", "deprecation" }) @Test public void testRollbackProcessorCrash() throws Exception { logger.info("Start testRollbackNoRetries"); @@ -971,7 +971,7 @@ public void testRollbackProcessorCrash() throws Exception { logger.info("Stop testRollbackNoRetries"); } - @SuppressWarnings("unchecked") + @SuppressWarnings({ "unchecked", "deprecation" }) @Test public void testBatchListenerRecoverAfterRollbackProcessorCrash() throws Exception { logger.info("Start testBatchListenerRollbackNoRetries"); @@ -999,7 +999,7 @@ public void testBatchListenerRecoverAfterRollbackProcessorCrash() throws Excepti } }); - @SuppressWarnings({ "rawtypes" }) + @SuppressWarnings({ "rawtypes"}) KafkaTransactionManager tm = new KafkaTransactionManager(pf); containerProps.setTransactionManager(tm); KafkaMessageListenerContainer container =