diff --git a/application/src/test/java/org/thingsboard/mqtt/broker/AbstractPubSubIntegrationTest.java b/application/src/test/java/org/thingsboard/mqtt/broker/AbstractPubSubIntegrationTest.java index ae2522b3c..bee87833d 100644 --- a/application/src/test/java/org/thingsboard/mqtt/broker/AbstractPubSubIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/mqtt/broker/AbstractPubSubIntegrationTest.java @@ -131,12 +131,10 @@ protected void after() { public static class ReplaceKafkaPropertiesBeanPostProcessor implements BeanPostProcessor { @Override public Object postProcessAfterInitialization(@NotNull Object bean, @NotNull String beanName) throws BeansException { - if (bean instanceof TbKafkaConsumerSettings) { - TbKafkaConsumerSettings kafkaSettings = (TbKafkaConsumerSettings) bean; + if (bean instanceof TbKafkaConsumerSettings kafkaSettings) { kafkaSettings.setServers(kafka.getBootstrapServers()); } - if (bean instanceof TbKafkaProducerSettings) { - TbKafkaProducerSettings kafkaSettings = (TbKafkaProducerSettings) bean; + if (bean instanceof TbKafkaProducerSettings kafkaSettings) { kafkaSettings.setServers(kafka.getBootstrapServers()); } if (bean instanceof TbKafkaAdminSettings kafkaAdminSettings) { diff --git a/application/src/test/java/org/thingsboard/mqtt/broker/service/integration/IntegrationTestInitService.java b/application/src/test/java/org/thingsboard/mqtt/broker/service/integration/IntegrationTestInitService.java index f86e30bc2..66157a2bd 100644 --- a/application/src/test/java/org/thingsboard/mqtt/broker/service/integration/IntegrationTestInitService.java +++ b/application/src/test/java/org/thingsboard/mqtt/broker/service/integration/IntegrationTestInitService.java @@ -15,6 +15,8 @@ */ package org.thingsboard.mqtt.broker.service.integration; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; import net.jodah.concurrentunit.Waiter; import org.springframework.stereotype.Service; @@ -26,15 +28,28 @@ @Service public class IntegrationTestInitService { + public static final int SUBSCRIBERS_COUNT = 10; public static final int PUBLISHERS_COUNT = 5; public static final int PUBLISH_MSGS_COUNT = 100; + private ExecutorService executor; + + @PostConstruct + public void init() { + executor = Executors.newFixedThreadPool(PUBLISHERS_COUNT); + } + + @PreDestroy + public void shutdown() { + executor.shutdownNow(); + } + public void initPubSubTest(BiConsumer subscriberInitializer, BiConsumer publisherInitializer) throws Throwable { Waiter subscribersWaiter = new Waiter(); CountDownLatch connectingSubscribers = new CountDownLatch(SUBSCRIBERS_COUNT); - ExecutorService executor = Executors.newFixedThreadPool(PUBLISHERS_COUNT); + executor = Executors.newFixedThreadPool(PUBLISHERS_COUNT); for (int i = 0; i < SUBSCRIBERS_COUNT; i++) { int finalI = i; executor.execute(() -> { diff --git a/application/src/test/java/org/thingsboard/mqtt/broker/service/subscription/SubscriptionTriePerformanceTest.java b/application/src/test/java/org/thingsboard/mqtt/broker/service/subscription/SubscriptionTriePerformanceTest.java index 88129e17a..d9d9b9082 100644 --- a/application/src/test/java/org/thingsboard/mqtt/broker/service/subscription/SubscriptionTriePerformanceTest.java +++ b/application/src/test/java/org/thingsboard/mqtt/broker/service/subscription/SubscriptionTriePerformanceTest.java @@ -43,12 +43,12 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; -import java.util.stream.Collectors; import java.util.stream.IntStream; @Slf4j @RunWith(MockitoJUnitRunner.class) public class SubscriptionTriePerformanceTest { + private static final String SERVICE_ID = "serviceId"; private static final int FIRST_LEVEL_SEGMENTS = 50; @@ -85,8 +85,7 @@ public void testSingleThread() throws Exception { List> levelSuppliers = initializeLevelSuppliers(); List topicFilters = initializeTopicFilters(levelSuppliers); - List topics = topicFilters.stream().map(s -> s.replaceAll("[+#]", "test")).collect(Collectors.toList()); - + List topics = topicFilters.stream().map(s -> s.replaceAll("[+#]", "test")).toList(); fillSubscriptionTrie(topicFilters); @@ -105,7 +104,7 @@ public void testSingleThread() throws Exception { task.get(30, TimeUnit.SECONDS); long endTime = System.currentTimeMillis(); System.out.println("All took " + (endTime - startTime) + " ms"); - + executor.shutdownNow(); } @Test @@ -113,15 +112,13 @@ public void testMultipleThreads() throws Exception { List> levelSuppliers = initializeLevelSuppliers(); List topicFilters = initializeTopicFilters(levelSuppliers); - List topics = topicFilters.stream().map(s -> s.replaceAll("[+#]", "test")).collect(Collectors.toList()); + List topics = topicFilters.stream().map(s -> s.replaceAll("[+#]", "test")).toList(); fillSubscriptionTrie(topicFilters); ExecutorService subscribeExecutor = Executors.newSingleThreadExecutor(); CountDownLatch processingPublishers = new CountDownLatch(NUMBER_OF_THREADS); - subscribeExecutor.execute(() -> { - simulateSubscribers(topicFilters, processingPublishers); - }); + subscribeExecutor.execute(() -> simulateSubscribers(topicFilters, processingPublishers)); long startTime = System.currentTimeMillis(); ExecutorService executor = Executors.newFixedThreadPool(NUMBER_OF_THREADS); @@ -142,6 +139,8 @@ public void testMultipleThreads() throws Exception { long endTime = System.currentTimeMillis(); System.out.println("All took " + (endTime - startTime) + " ms"); + subscribeExecutor.shutdownNow(); + executor.shutdownNow(); } private void simulateSubscribers(List topicFilters, CountDownLatch processingPublishers) { @@ -215,10 +214,10 @@ private List> initializeLevelSuppliers() { List> levelSuppliers = new ArrayList<>(MAX_LEVELS); List firstLevelSegments = IntStream.range(0, FIRST_LEVEL_SEGMENTS).boxed() .map(ignored -> UUID.randomUUID().toString().substring(0, 10)) - .collect(Collectors.toList()); + .toList(); List secondLevelSegments = IntStream.range(0, SECOND_LEVEL_SEGMENTS).boxed() .map(ignored -> UUID.randomUUID().toString().substring(0, 10)) - .collect(Collectors.toList()); + .toList(); levelSuppliers.add(() -> firstLevelSegments.get(r.nextInt(firstLevelSegments.size()))); levelSuppliers.add(() -> secondLevelSegments.get(r.nextInt(secondLevelSegments.size()))); for (int i = 0; i < MAX_LEVELS; i++) {