Skip to content

Commit

Permalink
minor fixes in tests
Browse files Browse the repository at this point in the history
  • Loading branch information
dmytro-landiak committed Nov 22, 2024
1 parent b711fd8 commit 9526699
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,10 @@ protected void after() {
public static class ReplaceKafkaPropertiesBeanPostProcessor implements BeanPostProcessor {
@Override
public Object postProcessAfterInitialization(@NotNull Object bean, @NotNull String beanName) throws BeansException {
if (bean instanceof TbKafkaConsumerSettings) {
TbKafkaConsumerSettings kafkaSettings = (TbKafkaConsumerSettings) bean;
if (bean instanceof TbKafkaConsumerSettings kafkaSettings) {
kafkaSettings.setServers(kafka.getBootstrapServers());
}
if (bean instanceof TbKafkaProducerSettings) {
TbKafkaProducerSettings kafkaSettings = (TbKafkaProducerSettings) bean;
if (bean instanceof TbKafkaProducerSettings kafkaSettings) {
kafkaSettings.setServers(kafka.getBootstrapServers());
}
if (bean instanceof TbKafkaAdminSettings kafkaAdminSettings) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package org.thingsboard.mqtt.broker.service.integration;

import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import net.jodah.concurrentunit.Waiter;
import org.springframework.stereotype.Service;

Expand All @@ -26,15 +28,28 @@

@Service
public class IntegrationTestInitService {

public static final int SUBSCRIBERS_COUNT = 10;
public static final int PUBLISHERS_COUNT = 5;
public static final int PUBLISH_MSGS_COUNT = 100;

private ExecutorService executor;

@PostConstruct
public void init() {
executor = Executors.newFixedThreadPool(PUBLISHERS_COUNT);
}

@PreDestroy
public void shutdown() {
executor.shutdownNow();
}

public void initPubSubTest(BiConsumer<Waiter, Integer> subscriberInitializer,
BiConsumer<Waiter, Integer> publisherInitializer) throws Throwable {
Waiter subscribersWaiter = new Waiter();
CountDownLatch connectingSubscribers = new CountDownLatch(SUBSCRIBERS_COUNT);
ExecutorService executor = Executors.newFixedThreadPool(PUBLISHERS_COUNT);
executor = Executors.newFixedThreadPool(PUBLISHERS_COUNT);
for (int i = 0; i < SUBSCRIBERS_COUNT; i++) {
int finalI = i;
executor.execute(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

@Slf4j
@RunWith(MockitoJUnitRunner.class)
public class SubscriptionTriePerformanceTest {

private static final String SERVICE_ID = "serviceId";

private static final int FIRST_LEVEL_SEGMENTS = 50;
Expand Down Expand Up @@ -85,8 +85,7 @@ public void testSingleThread() throws Exception {
List<Supplier<String>> levelSuppliers = initializeLevelSuppliers();

List<String> topicFilters = initializeTopicFilters(levelSuppliers);
List<String> topics = topicFilters.stream().map(s -> s.replaceAll("[+#]", "test")).collect(Collectors.toList());

List<String> topics = topicFilters.stream().map(s -> s.replaceAll("[+#]", "test")).toList();

fillSubscriptionTrie(topicFilters);

Expand All @@ -105,23 +104,21 @@ public void testSingleThread() throws Exception {
task.get(30, TimeUnit.SECONDS);
long endTime = System.currentTimeMillis();
System.out.println("All took " + (endTime - startTime) + " ms");

executor.shutdownNow();
}

@Test
public void testMultipleThreads() throws Exception {
List<Supplier<String>> levelSuppliers = initializeLevelSuppliers();

List<String> topicFilters = initializeTopicFilters(levelSuppliers);
List<String> topics = topicFilters.stream().map(s -> s.replaceAll("[+#]", "test")).collect(Collectors.toList());
List<String> topics = topicFilters.stream().map(s -> s.replaceAll("[+#]", "test")).toList();

fillSubscriptionTrie(topicFilters);

ExecutorService subscribeExecutor = Executors.newSingleThreadExecutor();
CountDownLatch processingPublishers = new CountDownLatch(NUMBER_OF_THREADS);
subscribeExecutor.execute(() -> {
simulateSubscribers(topicFilters, processingPublishers);
});
subscribeExecutor.execute(() -> simulateSubscribers(topicFilters, processingPublishers));
long startTime = System.currentTimeMillis();

ExecutorService executor = Executors.newFixedThreadPool(NUMBER_OF_THREADS);
Expand All @@ -142,6 +139,8 @@ public void testMultipleThreads() throws Exception {
long endTime = System.currentTimeMillis();
System.out.println("All took " + (endTime - startTime) + " ms");

subscribeExecutor.shutdownNow();
executor.shutdownNow();
}

private void simulateSubscribers(List<String> topicFilters, CountDownLatch processingPublishers) {
Expand Down Expand Up @@ -215,10 +214,10 @@ private List<Supplier<String>> initializeLevelSuppliers() {
List<Supplier<String>> levelSuppliers = new ArrayList<>(MAX_LEVELS);
List<String> firstLevelSegments = IntStream.range(0, FIRST_LEVEL_SEGMENTS).boxed()
.map(ignored -> UUID.randomUUID().toString().substring(0, 10))
.collect(Collectors.toList());
.toList();
List<String> secondLevelSegments = IntStream.range(0, SECOND_LEVEL_SEGMENTS).boxed()
.map(ignored -> UUID.randomUUID().toString().substring(0, 10))
.collect(Collectors.toList());
.toList();
levelSuppliers.add(() -> firstLevelSegments.get(r.nextInt(firstLevelSegments.size())));
levelSuppliers.add(() -> secondLevelSegments.get(r.nextInt(secondLevelSegments.size())));
for (int i = 0; i < MAX_LEVELS; i++) {
Expand Down

0 comments on commit 9526699

Please sign in to comment.