Skip to content

GH-2357: Remove Remaining Uses of ListenableFuture #2367

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ ext {
springBootVersion = '2.6.7' // docs module
springDataVersion = '2022.0.0-M5'
springRetryVersion = '1.3.3'
springVersion = '6.0.0-M5'
springVersion = '6.0.0-SNAPSHOT'
zookeeperVersion = '3.6.3'

idPrefix = 'kafka'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.support.TopicPartitionOffset;
Expand Down Expand Up @@ -62,7 +62,7 @@ public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageLis

private final List<KafkaMessageListenerContainer<K, V>> containers = new ArrayList<>();

private final List<AsyncListenableTaskExecutor> executors = new ArrayList<>();
private final List<AsyncTaskExecutor> executors = new ArrayList<>();

private int concurrency = 1;

Expand Down Expand Up @@ -237,7 +237,7 @@ private void configureChildContainer(int index, KafkaMessageListenerContainer<K,
stopAbnormally(() -> {
});
});
AsyncListenableTaskExecutor exec = container.getContainerProperties().getConsumerTaskExecutor();
AsyncTaskExecutor exec = container.getContainerProperties().getListenerTaskExecutor();
if (exec == null) {
if ((this.executors.size() > index)) {
exec = this.executors.get(index);
Expand All @@ -246,7 +246,7 @@ private void configureChildContainer(int index, KafkaMessageListenerContainer<K,
exec = new SimpleAsyncTaskExecutor(beanName + "-C-");
this.executors.add(exec);
}
container.getContainerProperties().setConsumerTaskExecutor(exec);
container.getContainerProperties().setListenerTaskExecutor(exec);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.springframework.aop.framework.Advised;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.aop.support.AopUtils;
import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.lang.Nullable;
import org.springframework.scheduling.TaskScheduler;
Expand Down Expand Up @@ -231,7 +231,7 @@ public enum EOSMode {
/**
* The executor for threads that poll the consumer.
*/
private AsyncListenableTaskExecutor consumerTaskExecutor;
private AsyncTaskExecutor listenerTaskExecutor;

/**
* The timeout for shutting down the container. This is the maximum amount of
Expand Down Expand Up @@ -380,10 +380,11 @@ public void setAckTime(long ackTime) {

/**
* Set the executor for threads that poll the consumer.
* @param consumerTaskExecutor the executor
* @param listenerTaskExecutor the executor
* @since 2.8.9
*/
public void setConsumerTaskExecutor(@Nullable AsyncListenableTaskExecutor consumerTaskExecutor) {
this.consumerTaskExecutor = consumerTaskExecutor;
public void setListenerTaskExecutor(@Nullable AsyncTaskExecutor listenerTaskExecutor) {
this.listenerTaskExecutor = listenerTaskExecutor;
}

/**
Expand Down Expand Up @@ -466,8 +467,8 @@ public Object getMessageListener() {
* @return the executor.
*/
@Nullable
public AsyncListenableTaskExecutor getConsumerTaskExecutor() {
return this.consumerTaskExecutor;
public AsyncTaskExecutor getListenerTaskExecutor() {
return this.listenerTaskExecutor;
}

public long getShutdownTimeout() {
Expand Down Expand Up @@ -919,8 +920,8 @@ public String toString() {
+ "\n ackCount=" + this.ackCount
+ "\n ackTime=" + this.ackTime
+ "\n messageListener=" + this.messageListener
+ (this.consumerTaskExecutor != null
? "\n consumerTaskExecutor=" + this.consumerTaskExecutor
+ (this.listenerTaskExecutor != null
? "\n listenerTaskExecutor=" + this.listenerTaskExecutor
: "")
+ "\n shutdownTimeout=" + this.shutdownTimeout
+ "\n idleEventInterval="
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -72,7 +74,7 @@
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.core.log.LogAccessor;
import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.core.ConsumerFactory;
Expand Down Expand Up @@ -123,8 +125,6 @@
import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;


/**
Expand Down Expand Up @@ -175,7 +175,7 @@ public class KafkaMessageListenerContainer<K, V> // NOSONAR line count

private volatile ListenerConsumer listenerConsumer;

private volatile ListenableFuture<?> listenerConsumerFuture;
private volatile CompletableFuture<Void> listenerConsumerFuture;

private volatile CountDownLatch startLatch = new CountDownLatch(1);

Expand Down Expand Up @@ -350,21 +350,22 @@ protected void doStart() {
checkAckMode(containerProperties);

Object messageListener = containerProperties.getMessageListener();
AsyncListenableTaskExecutor consumerExecutor = containerProperties.getConsumerTaskExecutor();
AsyncTaskExecutor consumerExecutor = containerProperties.getListenerTaskExecutor();
if (consumerExecutor == null) {
consumerExecutor = new SimpleAsyncTaskExecutor(
(getBeanName() == null ? "" : getBeanName()) + "-C-");
containerProperties.setConsumerTaskExecutor(consumerExecutor);
containerProperties.setListenerTaskExecutor(consumerExecutor);
}
GenericMessageListener<?> listener = (GenericMessageListener<?>) messageListener;
ListenerType listenerType = determineListenerType(listener);
this.listenerConsumer = new ListenerConsumer(listener, listenerType);
setRunning(true);
this.startLatch = new CountDownLatch(1);
this.listenerConsumerFuture = consumerExecutor
.submitListenable(this.listenerConsumer);
this.listenerConsumerFuture = consumerExecutor.submitCompletable(this.listenerConsumer);
try {
if (!this.startLatch.await(containerProperties.getConsumerStartTimeout().toMillis(), TimeUnit.MILLISECONDS)) {
if (!this.startLatch.await(containerProperties.getConsumerStartTimeout().toMillis(),
TimeUnit.MILLISECONDS)) {

this.logger.error("Consumer thread failed to start - does the configured task executor "
+ "have enough threads to support all containers and concurrency?");
publishConsumerFailedToStart();
Expand Down Expand Up @@ -403,7 +404,7 @@ private ListenerType determineListenerType(GenericMessageListener<?> listener) {
@Override
protected void doStop(final Runnable callback, boolean normal) {
if (isRunning()) {
this.listenerConsumerFuture.addCallback(new StopCallback(callback));
this.listenerConsumerFuture.whenComplete(new StopCallback(callback));
setRunning(false);
this.listenerConsumer.wakeIfNecessaryForStop();
setStoppedNormally(normal);
Expand Down Expand Up @@ -3712,7 +3713,7 @@ private static final class OffsetMetadata {

}

private class StopCallback implements ListenableFutureCallback<Object> {
private class StopCallback implements BiConsumer<Object, Throwable> {

private final Runnable callback;

Expand All @@ -3721,20 +3722,19 @@ private class StopCallback implements ListenableFutureCallback<Object> {
}

@Override
public void onFailure(Throwable e) {
KafkaMessageListenerContainer.this.logger
.error(e, "Error while stopping the container: ");
if (this.callback != null) {
this.callback.run();
public void accept(Object result, @Nullable Throwable throwable) {
if (throwable != null) {
KafkaMessageListenerContainer.this.logger.error(throwable, "Error while stopping the container");
if (this.callback != null) {
this.callback.run();
}
}
}

@Override
public void onSuccess(Object result) {
KafkaMessageListenerContainer.this.logger
.debug(() -> KafkaMessageListenerContainer.this + " stopped normally");
if (this.callback != null) {
this.callback.run();
else {
KafkaMessageListenerContainer.this.logger
.debug(() -> KafkaMessageListenerContainer.this + " stopped normally");
if (this.callback != null) {
this.callback.run();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -58,7 +59,6 @@
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.transaction.CannotCreateTransactionException;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.concurrent.SettableListenableFuture;

/**
* @author Gary Russell
Expand All @@ -71,7 +71,7 @@ public class DefaultKafkaProducerFactoryTests {
@Test
void testProducerClosedAfterBadTransition() throws Exception {
final Producer producer = mock(Producer.class);
given(producer.send(any(), any())).willReturn(new SettableListenableFuture<>());
given(producer.send(any(), any())).willReturn(new CompletableFuture());
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.concurrent.SettableListenableFuture;

/**
* @author Gary Russell
Expand Down Expand Up @@ -422,7 +421,7 @@ void testWithCallbackFailure() throws Exception {
willAnswer(inv -> {
Callback callback = inv.getArgument(1);
callback.onCompletion(null, new RuntimeException("test"));
return new SettableListenableFuture<RecordMetadata>();
return new CompletableFuture<RecordMetadata>();
}).given(producer).send(any(), any());
ProducerFactory<Integer, String> pf = mock(ProducerFactory.class);
given(pf.createProducer()).willReturn(producer);
Expand All @@ -449,7 +448,7 @@ void testWithCallbackFailureFunctional() throws Exception {
willAnswer(inv -> {
Callback callback = inv.getArgument(1);
callback.onCompletion(null, new RuntimeException("test"));
return new SettableListenableFuture<RecordMetadata>();
return new CompletableFuture<RecordMetadata>();
}).given(producer).send(any(), any());
ProducerFactory<Integer, String> pf = mock(ProducerFactory.class);
given(pf.createProducer()).willReturn(producer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -79,7 +80,6 @@
import org.springframework.transaction.support.AbstractPlatformTransactionManager;
import org.springframework.transaction.support.DefaultTransactionStatus;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.concurrent.SettableListenableFuture;

/**
* @author Gary Russell
Expand Down Expand Up @@ -316,10 +316,10 @@ public void testTransactionSynchronizationExceptionOnCommit() {
public void testDeadLetterPublisherWhileTransactionActive() {
@SuppressWarnings("unchecked")
Producer<Object, Object> producer1 = mock(Producer.class);
given(producer1.send(any(), any())).willReturn(new SettableListenableFuture<>());
given(producer1.send(any(), any())).willReturn(new CompletableFuture<>());
@SuppressWarnings("unchecked")
Producer<Object, Object> producer2 = mock(Producer.class);
given(producer2.send(any(), any())).willReturn(new SettableListenableFuture<>());
given(producer2.send(any(), any())).willReturn(new CompletableFuture<>());
producer1.initTransactions();

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -503,10 +503,10 @@ public void testAbort() {
public void testExecuteInTransactionNewInnerTx() {
@SuppressWarnings("unchecked")
Producer<Object, Object> producer1 = mock(Producer.class);
given(producer1.send(any(), any())).willReturn(new SettableListenableFuture<>());
given(producer1.send(any(), any())).willReturn(new CompletableFuture<>());
@SuppressWarnings("unchecked")
Producer<Object, Object> producer2 = mock(Producer.class);
given(producer2.send(any(), any())).willReturn(new SettableListenableFuture<>());
given(producer2.send(any(), any())).willReturn(new CompletableFuture<>());
producer1.initTransactions();
AtomicBoolean first = new AtomicBoolean(true);

Expand Down Expand Up @@ -596,15 +596,15 @@ public static class DeclarativeConfig {
@Bean
public Producer producer1() {
Producer mock = mock(Producer.class);
given(mock.send(any(), any())).willReturn(new SettableListenableFuture<>());
given(mock.send(any(), any())).willReturn(new CompletableFuture<>());
return mock;
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@Bean
public Producer producer2() {
Producer mock = mock(Producer.class);
given(mock.send(any(), any())).willReturn(new SettableListenableFuture<>());
given(mock.send(any(), any())).willReturn(new CompletableFuture<>());
return mock;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 the original author or authors.
* Copyright 2020-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,13 +25,13 @@

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.regex.Pattern;

import org.apache.kafka.clients.producer.Producer;
import org.junit.jupiter.api.Test;

import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.util.concurrent.SettableListenableFuture;

/**
* @author Gary Russell
Expand All @@ -44,9 +44,9 @@ public class RoutingKafkaTemplateTests {
@Test
public void routing() {
Producer<Object, Object> p1 = mock(Producer.class);
given(p1.send(any(), any())).willReturn(new SettableListenableFuture<>());
given(p1.send(any(), any())).willReturn(new CompletableFuture<>());
Producer<Object, Object> p2 = mock(Producer.class);
given(p2.send(any(), any())).willReturn(new SettableListenableFuture<>());
given(p2.send(any(), any())).willReturn(new CompletableFuture<>());
ProducerFactory<Object, Object> pf1 = mock(ProducerFactory.class);
ProducerFactory<Object, Object> pf2 = mock(ProducerFactory.class);
given(pf1.createProducer()).willReturn(p1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ void testThreadStarvation() throws InterruptedException {
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(1);
exec.afterPropertiesSet();
containerProperties.setConsumerTaskExecutor(exec);
containerProperties.setListenerTaskExecutor(exec);
containerProperties.setConsumerStartTimeout(Duration.ofMillis(50));
ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer<>(consumerFactory,
containerProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ public void testDelegateType() throws Exception {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(10);
scheduler.initialize();
containerProps.setConsumerTaskExecutor(scheduler);
containerProps.setListenerTaskExecutor(scheduler);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
container.setBeanName("delegate");
Expand Down
Loading