diff --git a/app/src/main/java/org/astraea/app/concurrent/Executor.java b/app/src/main/java/org/astraea/app/concurrent/Executor.java deleted file mode 100644 index 58bb44862c..0000000000 --- a/app/src/main/java/org/astraea/app/concurrent/Executor.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.astraea.app.concurrent; - -@FunctionalInterface -public interface Executor extends AutoCloseable { - - /** - * @return the state of this executor - * @throws InterruptedException This is an expected exception if your executor needs to call - * blocking method. This exception is not printed to console. - */ - State execute() throws InterruptedException; - - /** close this executor. */ - @Override - default void close() {} - - /** - * If this executor is in blocking mode, this method offers a way to wake up executor to close. - */ - default void wakeup() {} -} diff --git a/app/src/main/java/org/astraea/app/concurrent/State.java b/app/src/main/java/org/astraea/app/concurrent/State.java deleted file mode 100644 index fcc7e39299..0000000000 --- a/app/src/main/java/org/astraea/app/concurrent/State.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.astraea.app.concurrent; - -public enum State { - /** this thread is done. ThreadPool will call close to release the thread. */ - DONE, - /** this thread is running */ - RUNNING -} diff --git a/app/src/main/java/org/astraea/app/concurrent/ThreadPool.java b/app/src/main/java/org/astraea/app/concurrent/ThreadPool.java deleted file mode 100644 index 927b4fc518..0000000000 --- a/app/src/main/java/org/astraea/app/concurrent/ThreadPool.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.astraea.app.concurrent; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Objects; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * This class offers a simple way to manage the threads. All threads added to builder are not - * executing right now. Instead, all threads are starting when the pool is built. - */ -public interface ThreadPool extends AutoCloseable { - - // nothing to run. - ThreadPool EMPTY = - new ThreadPool() { - @Override - public void close() {} - - @Override - public void waitAll() {} - - @Override - public boolean isClosed() { - return true; - } - - @Override - public int size() { - return 0; - } - }; - - /** close all running threads. */ - @Override - void close(); - - /** wait all executors to be done. */ - void waitAll(); - - boolean isClosed(); - - /** @return the number of threads */ - int size(); - - static Builder builder() { - return new Builder(); - } - - class Builder { - private final List executors = new ArrayList<>(); - - private Builder() {} - - public Builder executor(Executor executor) { - return executors(List.of(executor)); - } - - public Builder executors(Collection executors) { - this.executors.addAll(Objects.requireNonNull(executors)); - return this; - } - - public ThreadPool build() { - if (executors.isEmpty()) return EMPTY; - var closed = new AtomicBoolean(false); - var latch = new CountDownLatch(executors.size()); - var service = Executors.newFixedThreadPool(executors.size()); - executors.forEach( - executor -> - service.execute( - () -> { - try { - while (!closed.get()) { - if (executor.execute() == State.DONE) break; - } - } catch (InterruptedException e) { - // swallow - } finally { - try { - executor.close(); - } finally { - latch.countDown(); - } - } - })); - return new ThreadPool() { - @Override - public void close() { - service.shutdownNow(); - closed.set(true); - executors.forEach(Executor::wakeup); - waitAll(); - } - - @Override - public void waitAll() { - try { - latch.await(); - } catch (InterruptedException e) { - // swallow - } - } - - @Override - public boolean isClosed() { - return closed.get(); - } - - @Override - public int size() { - return executors.size(); - } - }; - } - } -} diff --git a/app/src/test/java/org/astraea/app/admin/AdminTest.java b/app/src/test/java/org/astraea/app/admin/AdminTest.java index c317fabd63..b5d4bb3cf4 100644 --- a/app/src/test/java/org/astraea/app/admin/AdminTest.java +++ b/app/src/test/java/org/astraea/app/admin/AdminTest.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; @@ -37,8 +38,6 @@ import org.apache.kafka.common.errors.GroupNotEmptyException; import org.astraea.app.common.DataRate; import org.astraea.app.common.Utils; -import org.astraea.app.concurrent.State; -import org.astraea.app.concurrent.ThreadPool; import org.astraea.app.consumer.Consumer; import org.astraea.app.consumer.Deserializer; import org.astraea.app.producer.Producer; @@ -976,32 +975,29 @@ void testReassignmentWhenMovingPartitionToAnotherBroker() { try (var producer = Producer.of(bootstrapServers())) { var done = new AtomicBoolean(false); var data = new byte[1000]; - try (var pool = - ThreadPool.builder() - .executor( - () -> { - producer.sender().topic(topicName).key(data).value(data).run(); - return done.get() ? State.DONE : State.RUNNING; - }) - .build()) { - - try { - admin.migrator().topic(topicName).moveTo(List.of(nextBroker)); - var reassignment = - admin.reassignments(Set.of(topicName)).get(TopicPartition.of(topicName, 0)); - - // Don't verify the result if the migration is done - if (reassignment != null) { - Assertions.assertEquals(1, reassignment.from().size()); - var from = reassignment.from().iterator().next(); - Assertions.assertEquals(currentBroker, from.broker()); - Assertions.assertEquals(1, reassignment.to().size()); - var to = reassignment.to().iterator().next(); - Assertions.assertEquals(nextBroker, to.broker()); - } - } finally { - done.set(true); + var f = + CompletableFuture.runAsync( + () -> { + while (!done.get()) + producer.sender().topic(topicName).key(data).value(data).run(); + }); + try { + admin.migrator().topic(topicName).moveTo(List.of(nextBroker)); + var reassignment = + admin.reassignments(Set.of(topicName)).get(TopicPartition.of(topicName, 0)); + + // Don't verify the result if the migration is done + if (reassignment != null) { + Assertions.assertEquals(1, reassignment.from().size()); + var from = reassignment.from().iterator().next(); + Assertions.assertEquals(currentBroker, from.broker()); + Assertions.assertEquals(1, reassignment.to().size()); + var to = reassignment.to().iterator().next(); + Assertions.assertEquals(nextBroker, to.broker()); } + } finally { + done.set(true); + Utils.swallowException(f::get); } } } @@ -1027,33 +1023,31 @@ void testReassignmentWhenMovingPartitionToAnotherPath() { try (var producer = Producer.of(bootstrapServers())) { var done = new AtomicBoolean(false); var data = new byte[1000]; - try (var pool = - ThreadPool.builder() - .executor( - () -> { - producer.sender().topic(topicName).key(data).value(data).run(); - return done.get() ? State.DONE : State.RUNNING; - }) - .build()) { - - try { - admin.migrator().topic(topicName).moveTo(Map.of(currentReplica.broker(), nextPath)); - var reassignment = - admin.reassignments(Set.of(topicName)).get(TopicPartition.of(topicName, 0)); - // Don't verify the result if the migration is done - if (reassignment != null) { - Assertions.assertEquals(1, reassignment.from().size()); - var from = reassignment.from().iterator().next(); - Assertions.assertEquals(currentBroker, from.broker()); - Assertions.assertEquals(currentPath, from.path()); - Assertions.assertEquals(1, reassignment.to().size()); - var to = reassignment.to().iterator().next(); - Assertions.assertEquals(currentBroker, to.broker()); - Assertions.assertEquals(nextPath, to.path()); - } - } finally { - done.set(true); + var f = + CompletableFuture.runAsync( + () -> { + while (!done.get()) + producer.sender().topic(topicName).key(data).value(data).run(); + }); + + try { + admin.migrator().topic(topicName).moveTo(Map.of(currentReplica.broker(), nextPath)); + var reassignment = + admin.reassignments(Set.of(topicName)).get(TopicPartition.of(topicName, 0)); + // Don't verify the result if the migration is done + if (reassignment != null) { + Assertions.assertEquals(1, reassignment.from().size()); + var from = reassignment.from().iterator().next(); + Assertions.assertEquals(currentBroker, from.broker()); + Assertions.assertEquals(currentPath, from.path()); + Assertions.assertEquals(1, reassignment.to().size()); + var to = reassignment.to().iterator().next(); + Assertions.assertEquals(currentBroker, to.broker()); + Assertions.assertEquals(nextPath, to.path()); } + } finally { + done.set(true); + Utils.swallowException(f::get); } } } @@ -1070,27 +1064,24 @@ void testMultiReassignments() { try (var producer = Producer.of(bootstrapServers())) { var done = new AtomicBoolean(false); var data = new byte[1000]; - try (var pool = - ThreadPool.builder() - .executor( - () -> { - producer.sender().topic(topicName).key(data).value(data).run(); - return done.get() ? State.DONE : State.RUNNING; - }) - .build()) { - - try { - admin.migrator().topic(topicName).moveTo(brokers); - var reassignment = - admin.reassignments(Set.of(topicName)).get(TopicPartition.of(topicName, 0)); - // Don't verify the result if the migration is done - if (reassignment != null) { - Assertions.assertEquals(3, reassignment.from().size()); - Assertions.assertEquals(2, reassignment.to().size()); - } - } finally { - done.set(true); + var f = + CompletableFuture.runAsync( + () -> { + while (!done.get()) + producer.sender().topic(topicName).key(data).value(data).run(); + }); + try { + admin.migrator().topic(topicName).moveTo(brokers); + var reassignment = + admin.reassignments(Set.of(topicName)).get(TopicPartition.of(topicName, 0)); + // Don't verify the result if the migration is done + if (reassignment != null) { + Assertions.assertEquals(3, reassignment.from().size()); + Assertions.assertEquals(2, reassignment.to().size()); } + } finally { + done.set(true); + Utils.swallowException(f::get); } } } diff --git a/app/src/test/java/org/astraea/app/concurrent/ThreadPoolTest.java b/app/src/test/java/org/astraea/app/concurrent/ThreadPoolTest.java deleted file mode 100644 index c8dcf679b2..0000000000 --- a/app/src/test/java/org/astraea/app/concurrent/ThreadPoolTest.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.astraea.app.concurrent; - -import java.time.Duration; -import java.util.concurrent.atomic.AtomicInteger; -import org.astraea.app.common.Utils; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.Timeout; - -public class ThreadPoolTest { - - private static class CountExecutor implements Executor { - - private final AtomicInteger executeCount = new AtomicInteger(); - private final AtomicInteger closeCount = new AtomicInteger(); - private final AtomicInteger wakeupCount = new AtomicInteger(); - - @Override - public State execute() { - executeCount.incrementAndGet(); - return State.RUNNING; - } - - @Override - public void close() { - closeCount.incrementAndGet(); - } - - @Override - public void wakeup() { - wakeupCount.incrementAndGet(); - } - } - - @Test - void testSubmitThread() { - var executor = new CountExecutor(); - try (var pool = ThreadPool.builder().executor(executor).build()) { - Utils.sleep(Duration.ofSeconds(2)); - } - Assertions.assertTrue(executor.executeCount.get() > 0); - Assertions.assertEquals(1, executor.closeCount.get()); - Assertions.assertEquals(1, executor.wakeupCount.get()); - } - - @Test - void testWaitAll() { - try (var pool = ThreadPool.builder().executor(() -> State.DONE).build()) { - pool.waitAll(); - } - } - - @Timeout(10) - @Test - void testInterrupt() { - var pool = - ThreadPool.builder() - .executor( - () -> { - Utils.sleep(Duration.ofSeconds(100)); - return State.DONE; - }) - .build(); - pool.close(); - Assertions.assertTrue(pool.isClosed()); - } - - @Test - void testEmpty() { - try (var empty = ThreadPool.builder().build()) { - Assertions.assertEquals(ThreadPool.EMPTY, empty); - Assertions.assertEquals(0, empty.size()); - Assertions.assertTrue(empty.isClosed()); - } - } -} diff --git a/app/src/test/java/org/astraea/app/consumer/RebalanceListenerTest.java b/app/src/test/java/org/astraea/app/consumer/RebalanceListenerTest.java index 7c0c1eb39a..9765124abe 100644 --- a/app/src/test/java/org/astraea/app/consumer/RebalanceListenerTest.java +++ b/app/src/test/java/org/astraea/app/consumer/RebalanceListenerTest.java @@ -18,14 +18,14 @@ import java.time.Duration; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import org.astraea.app.common.Utils; -import org.astraea.app.concurrent.State; -import org.astraea.app.concurrent.ThreadPool; import org.astraea.app.service.RequireBrokerCluster; import org.junit.jupiter.api.Test; public class RebalanceListenerTest extends RequireBrokerCluster { + @Test void testConsumerRebalanceListener() { var getAssignment = new AtomicInteger(0); @@ -35,16 +35,13 @@ void testConsumerRebalanceListener() { .bootstrapServers(bootstrapServers()) .consumerRebalanceListener(ignore -> getAssignment.incrementAndGet()) .build()) { - try (var threadPool = - ThreadPool.builder() - .executor( - () -> { - consumer.poll(Duration.ofSeconds(10)); - return State.DONE; - }) - .build()) { - Utils.waitFor(() -> getAssignment.get() == 1, Duration.ofSeconds(10)); - } + CompletableFuture.runAsync(() -> consumer.poll(Duration.ofSeconds(10))); + Utils.waitFor( + () -> { + consumer.poll(Duration.ofSeconds(1)); + return getAssignment.get() == 1; + }, + Duration.ofSeconds(10)); } } } diff --git a/app/src/test/java/org/astraea/app/metrics/collector/BeanCollectorTest.java b/app/src/test/java/org/astraea/app/metrics/collector/BeanCollectorTest.java index ab1bd14c85..c7ed83a11e 100644 --- a/app/src/test/java/org/astraea/app/metrics/collector/BeanCollectorTest.java +++ b/app/src/test/java/org/astraea/app/metrics/collector/BeanCollectorTest.java @@ -17,17 +17,14 @@ package org.astraea.app.metrics.collector; import java.time.Duration; -import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.astraea.app.common.Utils; -import org.astraea.app.concurrent.Executor; -import org.astraea.app.concurrent.State; -import org.astraea.app.concurrent.ThreadPool; import org.astraea.app.metrics.HasBeanObject; import org.astraea.app.metrics.KafkaMetrics; import org.astraea.app.metrics.jmx.BeanObject; @@ -41,13 +38,6 @@ public class BeanCollectorTest { private final BiFunction clientCreator = (host, port) -> mbeanClient; - private static Executor executor(Runnable runnable) { - return () -> { - runnable.run(); - return State.RUNNING; - }; - } - private static HasBeanObject createBeanObject() { var obj = new BeanObject("domain", Map.of(), Map.of()); return () -> obj; @@ -70,7 +60,17 @@ void testAddress() { @Test void theImmutableCurrent() { var collector = BeanCollector.builder().clientCreator(clientCreator).build(); - var receivers = receivers(collector); + var receivers = + IntStream.range(0, 3) + .mapToObj( + ignored -> + collector + .register() + .host("unknown") + .port(100) + .fetcher(client -> List.of(createBeanObject())) + .build()) + .collect(Collectors.toUnmodifiableList()); receivers.forEach( r -> Assertions.assertThrows( @@ -129,43 +129,21 @@ void testRegister() { Assertions.assertThrows(NullPointerException.class, () -> collector.register().fetcher(null)); } - private List receivers(BeanCollector collector) { - var receivers = new ArrayList(); - Runnable runnable = - () -> { - try { - var receiver = - collector - .register() - .host("unknown") - .port(100) - .fetcher(client -> List.of(createBeanObject())) - .build(); - synchronized (receivers) { - receivers.add(receiver); - } - } finally { - Utils.sleep(Duration.ofSeconds(1)); - } - }; - - try (var pool = - ThreadPool.builder() - .executors( - IntStream.range(0, 3) - .mapToObj(i -> executor(runnable)) - .collect(Collectors.toList())) - .build()) { - Utils.sleep(Duration.ofSeconds(1)); - } - return receivers; - } - @Test void testCloseReceiver() { var collector = BeanCollector.builder().clientCreator(clientCreator).build(); - var receivers = receivers(collector); + var receivers = + IntStream.range(0, 3) + .mapToObj( + ignored -> + collector + .register() + .host("unknown") + .port(100) + .fetcher(client -> List.of(createBeanObject())) + .build()) + .collect(Collectors.toUnmodifiableList()); receivers.forEach(Receiver::current); Assertions.assertEquals(1, collector.nodes.size()); @@ -186,17 +164,23 @@ void testMultiThreadsUpdate() { .clientCreator(clientCreator) .build(); - var receivers = receivers(collector); - - try (var pool = - ThreadPool.builder() - .executors( - receivers.stream() - .map(receiver -> executor(receiver::current)) - .collect(Collectors.toList())) - .build()) { - Utils.sleep(Duration.ofSeconds(3)); - } + var receivers = + IntStream.range(0, 3) + .mapToObj( + ignored -> + collector + .register() + .host("unknown") + .port(100) + .fetcher(client -> List.of(createBeanObject())) + .build()) + .collect(Collectors.toUnmodifiableList()); + + var fs = + receivers.stream() + .map(r -> CompletableFuture.runAsync(r::current)) + .collect(Collectors.toUnmodifiableList()); + Utils.swallowException(() -> Utils.sequence(fs).get()); receivers.forEach(r -> Assertions.assertEquals(1, r.current().size())); } diff --git a/app/src/test/java/org/astraea/app/partitioner/smooth/SmoothWeightRoundRobinDispatchTest.java b/app/src/test/java/org/astraea/app/partitioner/smooth/SmoothWeightRoundRobinDispatchTest.java index 0541073454..85956798a6 100644 --- a/app/src/test/java/org/astraea/app/partitioner/smooth/SmoothWeightRoundRobinDispatchTest.java +++ b/app/src/test/java/org/astraea/app/partitioner/smooth/SmoothWeightRoundRobinDispatchTest.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -36,9 +37,6 @@ import org.astraea.app.admin.NodeInfo; import org.astraea.app.admin.ReplicaInfo; import org.astraea.app.common.Utils; -import org.astraea.app.concurrent.Executor; -import org.astraea.app.concurrent.State; -import org.astraea.app.concurrent.ThreadPool; import org.astraea.app.consumer.Consumer; import org.astraea.app.consumer.Deserializer; import org.astraea.app.consumer.Header; @@ -142,19 +140,19 @@ var record = records.iterator().next(); } @Test - void testMultipleProducer() { + void testMultipleProducer() throws ExecutionException, InterruptedException { var topicName = "addr"; admin.creator().topic(topicName).numberOfPartitions(10).create(); var key = "tainan"; var timestamp = System.currentTimeMillis() + 10; var header = Header.of("a", "b".getBytes()); - try (var threadPool = - ThreadPool.builder() - .executors( - IntStream.range(0, 10) - .mapToObj( - i -> - producerExecutor( + + Utils.sequence( + IntStream.range(0, 10) + .mapToObj( + ignored -> + CompletableFuture.runAsync( + producerThread( Producer.builder() .keySerializer(Serializer.STRING) .configs( @@ -167,11 +165,10 @@ void testMultipleProducer() { topicName, key, header, - timestamp)) - .collect(Collectors.toUnmodifiableList())) - .build()) { - threadPool.waitAll(); - } + timestamp))) + .collect(Collectors.toUnmodifiableList())) + .get(); + try (var consumer = Consumer.forTopics(Set.of(topicName)) .bootstrapServers(bootstrapServers()) @@ -249,34 +246,22 @@ void testJmxConfig() { } } - private Executor producerExecutor( + private Runnable producerThread( Producer producer, String topic, String key, Header header, long timeStamp) { - return new Executor() { - int i = 0; - - @Override - public State execute() throws InterruptedException { - if (i > 99) return State.DONE; - try { + return () -> { + try (producer) { + var i = 0; + while (i <= 99) { producer .sender() .topic(topic) .key(key) .timestamp(timeStamp) .headers(List.of(header)) - .run() - .toCompletableFuture() - .get(); - } catch (ExecutionException e) { - e.printStackTrace(); + .run(); + i++; } - i++; - return State.RUNNING; - } - - @Override - public void close() { - producer.close(); + producer.flush(); } }; }