diff --git a/build.gradle b/build.gradle index f2a6a0a8bc..10089389c6 100644 --- a/build.gradle +++ b/build.gradle @@ -74,7 +74,7 @@ ext { springBootVersion = '2.6.7' // docs module springDataVersion = '2022.0.0-M5' springRetryVersion = '1.3.3' - springVersion = '6.0.0-M5' + springVersion = '6.0.0-SNAPSHOT' zookeeperVersion = '3.6.3' idPrefix = 'kafka' diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java index f451d94b2c..4c1c3ff32c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java @@ -33,7 +33,7 @@ import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationEventPublisher; -import org.springframework.core.task.AsyncListenableTaskExecutor; +import org.springframework.core.task.AsyncTaskExecutor; import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.support.TopicPartitionOffset; @@ -62,7 +62,7 @@ public class ConcurrentMessageListenerContainer extends AbstractMessageLis private final List> containers = new ArrayList<>(); - private final List executors = new ArrayList<>(); + private final List executors = new ArrayList<>(); private int concurrency = 1; @@ -237,7 +237,7 @@ private void configureChildContainer(int index, KafkaMessageListenerContainer { }); }); - AsyncListenableTaskExecutor exec = container.getContainerProperties().getConsumerTaskExecutor(); + AsyncTaskExecutor exec = container.getContainerProperties().getListenerTaskExecutor(); if (exec == null) { if ((this.executors.size() > index)) { exec = this.executors.get(index); @@ -246,7 +246,7 @@ private void configureChildContainer(int index, KafkaMessageListenerContainer listener = (GenericMessageListener) messageListener; ListenerType listenerType = determineListenerType(listener); this.listenerConsumer = new ListenerConsumer(listener, listenerType); setRunning(true); this.startLatch = new CountDownLatch(1); - this.listenerConsumerFuture = consumerExecutor - .submitListenable(this.listenerConsumer); + this.listenerConsumerFuture = consumerExecutor.submitCompletable(this.listenerConsumer); try { - if (!this.startLatch.await(containerProperties.getConsumerStartTimeout().toMillis(), TimeUnit.MILLISECONDS)) { + if (!this.startLatch.await(containerProperties.getConsumerStartTimeout().toMillis(), + TimeUnit.MILLISECONDS)) { + this.logger.error("Consumer thread failed to start - does the configured task executor " + "have enough threads to support all containers and concurrency?"); publishConsumerFailedToStart(); @@ -403,7 +404,7 @@ private ListenerType determineListenerType(GenericMessageListener listener) { @Override protected void doStop(final Runnable callback, boolean normal) { if (isRunning()) { - this.listenerConsumerFuture.addCallback(new StopCallback(callback)); + this.listenerConsumerFuture.whenComplete(new StopCallback(callback)); setRunning(false); this.listenerConsumer.wakeIfNecessaryForStop(); setStoppedNormally(normal); @@ -3712,7 +3713,7 @@ private static final class OffsetMetadata { } - private class StopCallback implements ListenableFutureCallback { + private class StopCallback implements BiConsumer { private final Runnable callback; @@ -3721,20 +3722,19 @@ private class StopCallback implements ListenableFutureCallback { } @Override - public void onFailure(Throwable e) { - KafkaMessageListenerContainer.this.logger - .error(e, "Error while stopping the container: "); - if (this.callback != null) { - this.callback.run(); + public void accept(Object result, @Nullable Throwable throwable) { + if (throwable != null) { + KafkaMessageListenerContainer.this.logger.error(throwable, "Error while stopping the container"); + if (this.callback != null) { + this.callback.run(); + } } - } - - @Override - public void onSuccess(Object result) { - KafkaMessageListenerContainer.this.logger - .debug(() -> KafkaMessageListenerContainer.this + " stopped normally"); - if (this.callback != null) { - this.callback.run(); + else { + KafkaMessageListenerContainer.this.logger + .debug(() -> KafkaMessageListenerContainer.this + " stopped normally"); + if (this.callback != null) { + this.callback.run(); + } } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java index ecb4709d1a..a118480f12 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java @@ -36,6 +36,7 @@ import java.util.List; import java.util.Map; import java.util.Queue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -58,7 +59,6 @@ import org.springframework.kafka.transaction.KafkaTransactionManager; import org.springframework.transaction.CannotCreateTransactionException; import org.springframework.transaction.support.TransactionTemplate; -import org.springframework.util.concurrent.SettableListenableFuture; /** * @author Gary Russell @@ -71,7 +71,7 @@ public class DefaultKafkaProducerFactoryTests { @Test void testProducerClosedAfterBadTransition() throws Exception { final Producer producer = mock(Producer.class); - given(producer.send(any(), any())).willReturn(new SettableListenableFuture<>()); + given(producer.send(any(), any())).willReturn(new CompletableFuture()); DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) { @Override diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java index 30d3db4886..b5b2e4933b 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java @@ -96,7 +96,6 @@ import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; -import org.springframework.util.concurrent.SettableListenableFuture; /** * @author Gary Russell @@ -422,7 +421,7 @@ void testWithCallbackFailure() throws Exception { willAnswer(inv -> { Callback callback = inv.getArgument(1); callback.onCompletion(null, new RuntimeException("test")); - return new SettableListenableFuture(); + return new CompletableFuture(); }).given(producer).send(any(), any()); ProducerFactory pf = mock(ProducerFactory.class); given(pf.createProducer()).willReturn(producer); @@ -449,7 +448,7 @@ void testWithCallbackFailureFunctional() throws Exception { willAnswer(inv -> { Callback callback = inv.getArgument(1); callback.onCompletion(null, new RuntimeException("test")); - return new SettableListenableFuture(); + return new CompletableFuture(); }).given(producer).send(any(), any()); ProducerFactory pf = mock(ProducerFactory.class); given(pf.createProducer()).willReturn(producer); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java index 8ffdef1cf9..14f0522be2 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java @@ -39,6 +39,7 @@ import java.util.Iterator; import java.util.Map; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicBoolean; @@ -79,7 +80,6 @@ import org.springframework.transaction.support.AbstractPlatformTransactionManager; import org.springframework.transaction.support.DefaultTransactionStatus; import org.springframework.transaction.support.TransactionTemplate; -import org.springframework.util.concurrent.SettableListenableFuture; /** * @author Gary Russell @@ -316,10 +316,10 @@ public void testTransactionSynchronizationExceptionOnCommit() { public void testDeadLetterPublisherWhileTransactionActive() { @SuppressWarnings("unchecked") Producer producer1 = mock(Producer.class); - given(producer1.send(any(), any())).willReturn(new SettableListenableFuture<>()); + given(producer1.send(any(), any())).willReturn(new CompletableFuture<>()); @SuppressWarnings("unchecked") Producer producer2 = mock(Producer.class); - given(producer2.send(any(), any())).willReturn(new SettableListenableFuture<>()); + given(producer2.send(any(), any())).willReturn(new CompletableFuture<>()); producer1.initTransactions(); @SuppressWarnings("unchecked") @@ -503,10 +503,10 @@ public void testAbort() { public void testExecuteInTransactionNewInnerTx() { @SuppressWarnings("unchecked") Producer producer1 = mock(Producer.class); - given(producer1.send(any(), any())).willReturn(new SettableListenableFuture<>()); + given(producer1.send(any(), any())).willReturn(new CompletableFuture<>()); @SuppressWarnings("unchecked") Producer producer2 = mock(Producer.class); - given(producer2.send(any(), any())).willReturn(new SettableListenableFuture<>()); + given(producer2.send(any(), any())).willReturn(new CompletableFuture<>()); producer1.initTransactions(); AtomicBoolean first = new AtomicBoolean(true); @@ -596,7 +596,7 @@ public static class DeclarativeConfig { @Bean public Producer producer1() { Producer mock = mock(Producer.class); - given(mock.send(any(), any())).willReturn(new SettableListenableFuture<>()); + given(mock.send(any(), any())).willReturn(new CompletableFuture<>()); return mock; } @@ -604,7 +604,7 @@ public Producer producer1() { @Bean public Producer producer2() { Producer mock = mock(Producer.class); - given(mock.send(any(), any())).willReturn(new SettableListenableFuture<>()); + given(mock.send(any(), any())).willReturn(new CompletableFuture<>()); return mock; } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/RoutingKafkaTemplateTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/RoutingKafkaTemplateTests.java index dfd89a0c0a..361d68f47f 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/RoutingKafkaTemplateTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/RoutingKafkaTemplateTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 the original author or authors. + * Copyright 2020-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,13 +25,13 @@ import java.util.LinkedHashMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.regex.Pattern; import org.apache.kafka.clients.producer.Producer; import org.junit.jupiter.api.Test; import org.springframework.kafka.test.utils.KafkaTestUtils; -import org.springframework.util.concurrent.SettableListenableFuture; /** * @author Gary Russell @@ -44,9 +44,9 @@ public class RoutingKafkaTemplateTests { @Test public void routing() { Producer p1 = mock(Producer.class); - given(p1.send(any(), any())).willReturn(new SettableListenableFuture<>()); + given(p1.send(any(), any())).willReturn(new CompletableFuture<>()); Producer p2 = mock(Producer.class); - given(p2.send(any(), any())).willReturn(new SettableListenableFuture<>()); + given(p2.send(any(), any())).willReturn(new CompletableFuture<>()); ProducerFactory pf1 = mock(ProducerFactory.class); ProducerFactory pf2 = mock(ProducerFactory.class); given(pf1.createProducer()).willReturn(p1); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java index 5ca9af94f3..4757cd5019 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java @@ -101,7 +101,7 @@ void testThreadStarvation() throws InterruptedException { ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor(); exec.setCorePoolSize(1); exec.afterPropertiesSet(); - containerProperties.setConsumerTaskExecutor(exec); + containerProperties.setListenerTaskExecutor(exec); containerProperties.setConsumerStartTimeout(Duration.ofMillis(50)); ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java index 8a04d4de08..9bf4549b13 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java @@ -227,7 +227,7 @@ public void testDelegateType() throws Exception { ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); scheduler.setPoolSize(10); scheduler.initialize(); - containerProps.setConsumerTaskExecutor(scheduler); + containerProps.setListenerTaskExecutor(scheduler); KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProps); container.setBeanName("delegate"); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java index 1aab89ce56..43927d62a5 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java @@ -44,6 +44,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -101,7 +102,6 @@ import org.springframework.transaction.support.DefaultTransactionDefinition; import org.springframework.transaction.support.DefaultTransactionStatus; import org.springframework.util.backoff.FixedBackOff; -import org.springframework.util.concurrent.SettableListenableFuture; /** * @author Gary Russell @@ -210,7 +210,7 @@ private void testConsumeAndProduceTransactionGuts(boolean handleError, AckMode a return null; }).given(producer).sendOffsetsToTransaction(any(), any(ConsumerGroupMetadata.class)); } - given(producer.send(any(), any())).willReturn(new SettableListenableFuture<>()); + given(producer.send(any(), any())).willReturn(new CompletableFuture<>()); final CountDownLatch closeLatch = new CountDownLatch(2); willAnswer(i -> { closeLatch.countDown(); @@ -455,7 +455,7 @@ public void testConsumeAndProduceTransactionExternalTM() throws Exception { ConsumerFactory cf = mock(ConsumerFactory.class); willReturn(consumer).given(cf).createConsumer("group", "", null, KafkaTestUtils.defaultPropertyOverrides()); Producer producer = mock(Producer.class); - given(producer.send(any(), any())).willReturn(new SettableListenableFuture<>()); + given(producer.send(any(), any())).willReturn(new CompletableFuture<>()); final CountDownLatch closeLatch = new CountDownLatch(1); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java b/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java index 0aaec40f7e..69e09770d3 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java @@ -97,7 +97,6 @@ import org.springframework.messaging.support.MessageBuilder; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; -import org.springframework.util.concurrent.SettableListenableFuture; /** * @author Gary Russell @@ -554,7 +553,7 @@ public void testAggregateOrphansNotStored() throws Exception { willAnswer(invocation -> { ProducerRecord rec = invocation.getArgument(0); correlation.set(rec.headers().lastHeader(KafkaHeaders.CORRELATION_ID).value()); - return new SettableListenableFuture<>(); + return new CompletableFuture<>(); }).given(producer).send(any(), any()); AggregatingReplyingKafkaTemplate template = new AggregatingReplyingKafkaTemplate(pf, container, (list, timeout) -> true); @@ -693,8 +692,8 @@ void nullDuration() throws Exception { Producer producer = mock(Producer.class); willAnswer(invocation -> { Callback callback = invocation.getArgument(1); - SettableListenableFuture future = new SettableListenableFuture<>(); - future.set("done"); + CompletableFuture future = new CompletableFuture<>(); + future.complete("done"); callback.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0L, 0, 0L, 0, 0), null); return future; }).given(producer).send(any(), any()); @@ -717,7 +716,7 @@ void requestTimeoutWithMessage() throws Exception { ProducerFactory pf = mock(ProducerFactory.class); Producer producer = mock(Producer.class); willAnswer(invocation -> { - return new SettableListenableFuture<>(); + return new CompletableFuture<>(); }).given(producer).send(any(), any()); given(pf.createProducer()).willReturn(producer); GenericMessageListenerContainer container = mock(GenericMessageListenerContainer.class); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsTests.java b/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsTests.java index 1f72f8c4ac..c443738b67 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsTests.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -72,7 +73,6 @@ import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; -import org.springframework.util.concurrent.SettableListenableFuture; /** * @author Artem Bilan @@ -105,7 +105,7 @@ public class KafkaStreamsTests { private KafkaTemplate kafkaTemplate; @Autowired - private SettableListenableFuture> resultFuture; + private CompletableFuture> resultFuture; @Autowired private StreamsBuilderFactoryBean streamsBuilderFactoryBean; @@ -264,13 +264,13 @@ public ConsumerFactory consumerFactory() { } @Bean - public SettableListenableFuture> resultFuture() { - return new SettableListenableFuture<>(); + public CompletableFuture> resultFuture() { + return new CompletableFuture<>(); } @KafkaListener(topics = "${streaming.topic.two}") public void listener(ConsumerRecord payload) { - resultFuture().set(payload); + resultFuture().complete(payload); } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/streams/RecoveringDeserializationExceptionHandlerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/streams/RecoveringDeserializationExceptionHandlerTests.java index 566d33a857..bb4098c95f 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/streams/RecoveringDeserializationExceptionHandlerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/streams/RecoveringDeserializationExceptionHandlerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2020 the original author or authors. + * Copyright 2019-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -65,7 +66,6 @@ import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; -import org.springframework.util.concurrent.SettableListenableFuture; /** * @author Gary Russell @@ -81,7 +81,7 @@ public class RecoveringDeserializationExceptionHandlerTests { private KafkaTemplate kafkaTemplate; @Autowired - private SettableListenableFuture> resultFuture; + private CompletableFuture> resultFuture; @Test void viaStringProperty() { @@ -237,13 +237,13 @@ public ConsumerFactory consumerFactory() { } @Bean - public SettableListenableFuture> resultFuture() { - return new SettableListenableFuture<>(); + public CompletableFuture> resultFuture() { + return new CompletableFuture<>(); } @KafkaListener(topics = "recovererDLQ") public void listener(ConsumerRecord payload) { - resultFuture().set(payload); + resultFuture().complete(payload); } }