Skip to content

Commit

Permalink
Experiment with artificial tree-deepening
Browse files Browse the repository at this point in the history
  • Loading branch information
hylkevds committed Jul 30, 2024
1 parent 4af3483 commit b8c68c8
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ private List<Subscription> recursiveMatch(Topic topicName, INode inode, int dept
// type #, + or exact match
Optional<INode> subInode = cnode.childOf(Token.MULTI);
if (subInode.isPresent()) {
Topic remainingRealTopic = (ROOT.equals(cnode.getToken())) ? topicName : topicName.exceptFullHeadToken();
subscriptions.addAll(recursiveMatch(remainingTopic, subInode.get(), depth + 1));
}
subInode = cnode.childOf(Token.SINGLE);
Expand Down
14 changes: 14 additions & 0 deletions broker/src/main/java/io/moquette/broker/subscriptions/Token.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,29 @@ public class Token implements Comparable<Token> {
static final Token MULTI = new Token("#");
static final Token SINGLE = new Token("+");
final String name;
boolean lastSubToken;

protected Token(String s) {
this(s, true);
}

protected Token(String s, boolean isLastSub) {
name = s;
lastSubToken = isLastSub;
}

protected String name() {
return name;
}

protected void setLastSubToken(boolean lastSubToken) {
this.lastSubToken = lastSubToken;
}

protected boolean isLastSubToken() {
return lastSubToken;
}

@Override
public int hashCode() {
int hash = 7;
Expand Down
76 changes: 66 additions & 10 deletions broker/src/main/java/io/moquette/broker/subscriptions/Topic.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public class Topic implements Serializable, Comparable<Topic> {

private static final Logger LOG = LoggerFactory.getLogger(Topic.class);

public static int MAX_TOKEN_LENGTH = 4;

private static final long serialVersionUID = 2438799283749822L;

private final String topic;
Expand All @@ -55,7 +57,7 @@ public Topic(String topic) {

Topic(List<Token> tokens) {
this.tokens = tokens;
List<String> strTokens = tokens.stream().map(Token::toString).collect(Collectors.toList());
List<String> strTokens = fullTokens().stream().map(Token::toString).collect(Collectors.toList());
this.topic = String.join("/", strTokens);
this.valid = true;
}
Expand All @@ -74,7 +76,24 @@ public List<Token> getTokens() {
return tokens;
}

private List<Token> parseTopic(String topic) throws ParseException {
public List<Token> fullTokens() {
List<Token> fullTokens = new ArrayList<>();
String currentToken = null;
for (Token token : getTokens()) {
if (currentToken == null) {
currentToken = token.name;
} else {
currentToken += token.name;
}
if (token.isLastSubToken()) {
fullTokens.add(new Token(currentToken, true));
currentToken = null;
}
}
return fullTokens;
}

private static List<Token> parseTopic(String topic) throws ParseException {
if (topic.length() == 0) {
throw new ParseException("Bad format of topic, topic MUST be at least 1 character [MQTT-4.7.3-1] and " +
"this was empty", 0);
Expand Down Expand Up @@ -117,7 +136,18 @@ private List<Token> parseTopic(String topic) throws ParseException {
} else if (s.contains("+")) {
throw new ParseException("Bad format of topic, invalid subtopic name: " + s, i);
} else {
res.add(new Token(s));
final int l = s.length();
int start = 0;
Token token = null;
while (start < l) {
int end = Math.min(start + MAX_TOKEN_LENGTH, l);
final String subToken = s.substring(start, end);
token = new Token(subToken, false);
res.add(token);
start = end;
}
// Can't be null because s can't be empty.
token.setLastSubToken(true);
}
}

Expand Down Expand Up @@ -151,6 +181,22 @@ public Topic exceptHeadToken() {
return new Topic(tokensCopy);
}

/**
* @return a new Topic corresponding to this less than the full head token, skipping any sub-tokens.
*/
public Topic exceptFullHeadToken() {
List<Token> tokens = getTokens();
if (tokens.isEmpty()) {
return new Topic(Collections.emptyList());
}
List<Token> tokensCopy = new ArrayList<>(tokens);
Token removed;
do {
removed = tokensCopy.remove(0);
} while (!removed.isLastSubToken() && !tokensCopy.isEmpty());
return new Topic(tokensCopy);
}

public boolean isValid() {
if (tokens == null)
getTokens();
Expand All @@ -169,27 +215,37 @@ public boolean isValid() {
public boolean match(Topic subscriptionTopic) {
List<Token> msgTokens = getTokens();
List<Token> subscriptionTokens = subscriptionTopic.getTokens();
// Due to sub-tokens and the + wildcard, indexes may differ.
int i = 0;
for (; i < subscriptionTokens.size(); i++) {
int m = 0;
for (; i < subscriptionTokens.size(); i++, m++) {
Token subToken = subscriptionTokens.get(i);
if (!Token.MULTI.equals(subToken) && !Token.SINGLE.equals(subToken)) {
if (i >= msgTokens.size()) {
if (m >= msgTokens.size()) {
return false;
}
Token msgToken = msgTokens.get(i);
Token msgToken = msgTokens.get(m);
if (!msgToken.equals(subToken)) {
return false;
}
} else {
if (Token.MULTI.equals(subToken)) {
return true;
}
// if (Token.SINGLE.equals(subToken)) {
// // skip a step forward
// }
if (m >= msgTokens.size()) {
return false;
}
if (Token.SINGLE.equals(subToken)) {
// skip to the next full token in the message topic
Token msgToken = msgTokens.get(m);
while (!msgToken.isLastSubToken()) {
m++;
msgToken = msgTokens.get(m);
}
}
}
}
return i == msgTokens.size();
return m == msgTokens.size();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,34 @@ static SubscriptionRequest clientSubOnTopic(String clientID, String topicName) {
@Test
@Timeout(value = MAX_DURATION_S)
public void testManyClientsFewTopics() {
List<SubscriptionRequest> subscriptionList = prepareSubscriptionsManyClientsFewTopic();

List<SubscriptionRequest> subscriptionList = prepareSubscriptionsManyClientsFewTopic(50_000);
createSubscriptions(subscriptionList);
}

@Test
@Timeout(value = MAX_DURATION_S)
public void testFlat() {
List<SubscriptionRequest> results = prepareSubscriptionsFlat();
createSubscriptions(results);
Topic.MAX_TOKEN_LENGTH = 1;
createSubscriptions(prepareSubscriptionsFlat(TOTAL_SUBSCRIPTIONS));
Topic.MAX_TOKEN_LENGTH = 2;
createSubscriptions(prepareSubscriptionsFlat(TOTAL_SUBSCRIPTIONS));
Topic.MAX_TOKEN_LENGTH = 3;
createSubscriptions(prepareSubscriptionsFlat(TOTAL_SUBSCRIPTIONS));
Topic.MAX_TOKEN_LENGTH = 4;
createSubscriptions(prepareSubscriptionsFlat(TOTAL_SUBSCRIPTIONS));
Topic.MAX_TOKEN_LENGTH = 5;
createSubscriptions(prepareSubscriptionsFlat(TOTAL_SUBSCRIPTIONS));
Topic.MAX_TOKEN_LENGTH = 6;
createSubscriptions(prepareSubscriptionsFlat(TOTAL_SUBSCRIPTIONS));
Topic.MAX_TOKEN_LENGTH = 7;
createSubscriptions(prepareSubscriptionsFlat(TOTAL_SUBSCRIPTIONS));
}

@Test
@Timeout(value = MAX_DURATION_S)
public void testDeep() {
List<SubscriptionRequest> results = prepareSubscriptionsDeep();
List<SubscriptionRequest> results = prepareSubscriptionsDeep(TOTAL_SUBSCRIPTIONS);
createSubscriptions(results);
}

Expand All @@ -83,38 +96,45 @@ public void createSubscriptions(List<SubscriptionRequest> results) {
}
long end = System.currentTimeMillis();
long duration = end - start;
LOGGER.info("Added " + count + " subscriptions in " + duration + " ms (" + Math.round(1000.0 * count / duration) + "/s)");
final long speed = Math.round(1000.0 * count / duration);
LOGGER.info("{}: Added {} subscriptions in {} ms ({}/s)", Topic.MAX_TOKEN_LENGTH, count, duration, speed);
}

public List<SubscriptionRequest> prepareSubscriptionsManyClientsFewTopic() {
List<SubscriptionRequest> subscriptionList = new ArrayList<>(TOTAL_SUBSCRIPTIONS);
for (int i = 0; i < TOTAL_SUBSCRIPTIONS; i++) {
Topic topic = asTopic("topic/test/" + new Random().nextInt(1 + i % 10) + "/test");
public List<SubscriptionRequest> prepareSubscriptionsManyClientsFewTopic(int subCount) {
List<SubscriptionRequest> subscriptionList = new ArrayList<>(subCount);
long start = System.currentTimeMillis();
for (int i = 0; i < subCount; i++) {
Topic topic = asTopic("topic/test/" + new Random().nextInt(10) + "/test");
subscriptionList.add(SubscriptionRequest.buildNonShared("TestClient-" + i, topic, MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_LEAST_ONCE)));
}
long end = System.currentTimeMillis();
long duration = end - start;
LOGGER.debug("Prepared {} subscriptions in {} ms on 10 topics", subCount, duration);
return subscriptionList;
}

public List<SubscriptionRequest> prepareSubscriptionsFlat() {
List<SubscriptionRequest> results = new ArrayList<>(TOTAL_SUBSCRIPTIONS);
public List<SubscriptionRequest> prepareSubscriptionsFlat(int subCount) {
List<SubscriptionRequest> results = new ArrayList<>(subCount);
int count = 0;
long start = System.currentTimeMillis();
for (int topicNr = 0; topicNr < TOTAL_SUBSCRIPTIONS / 10; topicNr++) {
for (int clientNr = 0; clientNr < 10; clientNr++) {
final int clientCount = 1;
final int topicCount = subCount / clientCount;
for (int clientNr = 0; clientNr < clientCount; clientNr++) {
for (int topicNr = 0; topicNr < topicCount; topicNr++) {
count++;
results.add(clientSubOnTopic("Client-" + clientNr, "mainTopic-" + topicNr));
results.add(clientSubOnTopic("Client-" + clientNr, topicNr + "-mainTopic"));
}
}
long end = System.currentTimeMillis();
long duration = end - start;
LOGGER.info("Prepared {} subscriptions in {} ms", count, duration);
LOGGER.debug("Prepared {} subscriptions for {} topics in {} ms", count, topicCount, duration);
return results;
}

public List<SubscriptionRequest> prepareSubscriptionsDeep() {
List<SubscriptionRequest> results = new ArrayList<>(TOTAL_SUBSCRIPTIONS);
long countPerLevel = Math.round(Math.pow(TOTAL_SUBSCRIPTIONS, 0.25));
LOGGER.info("Preparing {} subscriptions, 4 deep with {} per level", TOTAL_SUBSCRIPTIONS, countPerLevel);
public List<SubscriptionRequest> prepareSubscriptionsDeep(int subCount) {
List<SubscriptionRequest> results = new ArrayList<>(subCount);
long countPerLevel = Math.round(Math.pow(subCount, 0.25));
LOGGER.info("Preparing {} subscriptions, 4 deep with {} per level", subCount, countPerLevel);
int count = 0;
long start = System.currentTimeMillis();
outerloop:
Expand All @@ -125,7 +145,7 @@ public List<SubscriptionRequest> prepareSubscriptionsDeep() {
count++;
results.add(clientSubOnTopic("Client-" + clientNr, "mainTopic-" + firstLevelNr + "/subTopic-" + secondLevelNr + "/subSubTopic" + thirdLevelNr));
// Due to the 4th-power-root we don't get exactly the required number of subs.
if (count >= TOTAL_SUBSCRIPTIONS) {
if (count >= subCount) {
break outerloop;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public TopicAssert doesNotMatch(String topic) {
}

public TopicAssert containsToken(Object... tokens) {
Assertions.assertThat(actual.getTokens()).containsExactly(asArray(tokens));
Assertions.assertThat(actual.fullTokens()).containsExactly(asArray(tokens));

return myself;
}
Expand Down

0 comments on commit b8c68c8

Please sign in to comment.