Skip to content

Commit

Permalink
topic alias context concurrent update fix
Browse files Browse the repository at this point in the history
  • Loading branch information
dmytro-landiak committed Nov 18, 2024
1 parent 9d3de7e commit 2a74573
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
@Getter
Expand All @@ -40,6 +41,8 @@ public class TopicAliasCtx {
private final int maxTopicAlias;
private final ConcurrentMap<Integer, String> clientMappings;
private final ConcurrentMap<String, Integer> serverMappings;
private final AtomicInteger clientMappingsCount;
private final AtomicInteger serverMappingsCount;

public TopicAliasCtx(boolean enabled, int maxTopicAlias) {
this(enabled, maxTopicAlias, new ConcurrentHashMap<>(), new ConcurrentHashMap<>());
Expand All @@ -52,6 +55,8 @@ public TopicAliasCtx(boolean enabled, int maxTopicAlias,
this.maxTopicAlias = maxTopicAlias;
this.clientMappings = clientMappings;
this.serverMappings = serverMappings;
this.clientMappingsCount = clientMappings != null ? new AtomicInteger(clientMappings.size()) : null;
this.serverMappingsCount = serverMappings != null ? new AtomicInteger(serverMappings.size()) : null;
}

public String getTopicNameByAlias(PublishMsg publishMsg) {
Expand Down Expand Up @@ -148,12 +153,12 @@ private String getTopicByAlias(int topicAlias) {

private void saveMapping(int topicAlias, String topicName) {
clientMappings.put(topicAlias, topicName);
clientMappingsCount.incrementAndGet();
}

int getNextTopicAlias(String topicName) {
if (isMoreTopicAliasAvailable()) {
int lastTopicAlias = serverMappings.size();
int nextTopicAlias = lastTopicAlias + 1;
int nextTopicAlias = serverMappingsCount.incrementAndGet();
serverMappings.put(topicName, nextTopicAlias);
return nextTopicAlias;
}
Expand All @@ -165,6 +170,6 @@ private boolean isMoreTopicAliasAvailable() {
}

private int currentTopicAliasesCount() {
return clientMappings.size() + serverMappings.size();
return clientMappingsCount.get() + serverMappingsCount.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package org.thingsboard.mqtt.broker.session;

import io.netty.handler.codec.mqtt.MqttProperties;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.Assert;
import org.junit.Test;
import org.thingsboard.mqtt.broker.common.data.BrokerConstants;
Expand All @@ -24,8 +25,15 @@
import org.thingsboard.mqtt.broker.gen.queue.QueueProtos;
import org.thingsboard.mqtt.broker.service.mqtt.PublishMsg;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class TopicAliasCtxTest {

Expand Down Expand Up @@ -331,6 +339,34 @@ public void givenMaxAliasesNotReached_whenGetNextTopicAlias_thenReturnNewAlias()
Assert.assertEquals(2, topicAlias);
}

@Test
public void givenTopicAlias_whenGetNextTopicAliasConcurrently_thenReturnExpectedResult() throws InterruptedException {
topicAliasCtx = new TopicAliasCtx(true, 100);

ExecutorService executorService = Executors.newFixedThreadPool(4);

CountDownLatch latch = new CountDownLatch(10);
List<Integer> allTopicAliases = new CopyOnWriteArrayList<>();
List<Integer> expectedTopicAliases = new ArrayList<>();

for (int i = 0; i < 10; i++) {
expectedTopicAliases.add(i + 1);
executorService.submit(() -> {
int topicAlias = topicAliasCtx.getNextTopicAlias(RandomStringUtils.randomAlphabetic(10));
allTopicAliases.add(topicAlias);
latch.countDown();
});
}
boolean awaitResult = latch.await(10, TimeUnit.SECONDS);

Assert.assertTrue(awaitResult);
Assert.assertEquals(10, allTopicAliases.size());
Assert.assertEquals(10, expectedTopicAliases.size());
Assert.assertTrue(allTopicAliases.containsAll(expectedTopicAliases));

executorService.shutdownNow();
}

@Test
public void givenAliasIsLessThanMaxAlias_whenValidateTopicAlias_thenSuccess() {
topicAliasCtx = new TopicAliasCtx(true, 2);
Expand Down

0 comments on commit 2a74573

Please sign in to comment.