From 0642876e1de86767ac11567840bef9279026483c Mon Sep 17 00:00:00 2001 From: Hylke van der Schaaf Date: Sun, 25 Aug 2024 10:15:27 +0200 Subject: [PATCH] Improved subscription remove performance --- .../moquette/broker/subscriptions/CNode.java | 37 ++++++------------- .../broker/subscriptions/CTrieTest.java | 5 ++- 2 files changed, 15 insertions(+), 27 deletions(-) diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/CNode.java b/broker/src/main/java/io/moquette/broker/subscriptions/CNode.java index a58a74794..232c74171 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/CNode.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/CNode.java @@ -42,8 +42,8 @@ class CNode implements Comparable { public static final Random SECURE_RANDOM = new SecureRandom(); private final Token token; private PMap children; - // Map of subscriptions. Not a Set, because Set doesn't have a Get method and we may need to update. - private PMap subscriptions; + // Map of subscriptions per clientId. + private PMap subscriptions; // the list of SharedSubscription is sorted. The sort is necessary for fast access, instead of linear scan. private Map> sharedSubscriptions; @@ -55,7 +55,7 @@ class CNode implements Comparable { } //Copy constructor - private CNode(Token token, PMap children, PMap subscriptions, Map> sharedSubscriptions) { + private CNode(Token token, PMap children, PMap subscriptions, Map> sharedSubscriptions) { this.token = token; // keep reference, root comparison in directory logic relies on it for now. this.subscriptions = subscriptions; this.sharedSubscriptions = new HashMap<>(sharedSubscriptions); @@ -113,8 +113,8 @@ private List sharedSubscriptions() { return selectedSubscriptions; } - Set subscriptions() { - return subscriptions.keySet(); + Collection subscriptions() { + return subscriptions.values(); } // Mutating operation @@ -136,15 +136,15 @@ CNode addSubscription(SubscriptionRequest request) { final Subscription newSubscription = request.subscription(); // if already contains one with same topic and same client, keep that with higher QoS - final Subscription existing = subscriptions.get(newSubscription); + final Subscription existing = subscriptions.get(newSubscription.clientId); if (existing != null) { // Subscription already exists if (needsToUpdateExistingSubscription(newSubscription, existing)) { - subscriptions = subscriptions.plus(newSubscription, newSubscription); + subscriptions = subscriptions.plus(newSubscription.clientId, newSubscription); } } else { // insert into the expected index so that the sorting is maintained - subscriptions = subscriptions.plus(newSubscription, newSubscription); + subscriptions = subscriptions.plus(newSubscription.clientId, newSubscription); } } return this; @@ -170,8 +170,8 @@ private static boolean needsToUpdateExistingSubscription(Subscription newSubscri * AND at least one subscription is actually present for that clientId * */ boolean containsOnly(String clientId) { - for (Subscription sub : this.subscriptions.values()) { - if (!sub.clientId.equals(clientId)) { + for (String sub : this.subscriptions.keySet()) { + if (!sub.equals(clientId)) { return false; } } @@ -200,12 +200,7 @@ private static SharedSubscription wrapKey(String clientId) { //TODO this is equivalent to negate(containsOnly(clientId)) private boolean containsSubscriptionsForClient(String clientId) { - for (Subscription sub : this.subscriptions.values()) { - if (sub.clientId.equals(clientId)) { - return true; - } - } - return false; + return subscriptions.containsKey(clientId); } void removeSubscriptionsFor(UnsubscribeRequest request) { @@ -224,15 +219,7 @@ void removeSubscriptionsFor(UnsubscribeRequest request) { this.sharedSubscriptions.replace(request.getSharedName(), subscriptionsForName); } } else { - // collect Subscription instances to remove - Set toRemove = new HashSet<>(); - for (Subscription sub : this.subscriptions.values()) { - if (sub.clientId.equals(clientId)) { - toRemove.add(sub); - } - } - // effectively remove the instances - subscriptions = subscriptions.minusAll(toRemove); + subscriptions = subscriptions.minus(clientId); } } diff --git a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieTest.java b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieTest.java index 2b42ab4ae..7a7221abe 100644 --- a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieTest.java +++ b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieTest.java @@ -26,6 +26,7 @@ import static io.moquette.broker.subscriptions.SubscriptionTestUtils.asSubscription; import static io.moquette.broker.subscriptions.Topic.asTopic; +import java.util.Collection; import java.util.List; import java.util.Set; import static org.assertj.core.api.Assertions.assertThat; @@ -100,7 +101,7 @@ public void testAddNewSubscriptionOnExistingNode() { //Verify final Optional matchedNode = sut.lookup(asTopic("/temp")); assertTrue(matchedNode.isPresent(), "Node on path /temp must be present"); - final Set subscriptions = matchedNode.get().subscriptions(); + final Collection subscriptions = matchedNode.get().subscriptions(); assertTrue(subscriptions.contains(asSubscription("TempSensor2", "/temp"))); } @@ -118,7 +119,7 @@ public void testAddNewDeepNodes() { //Verify final Optional matchedNode = sut.lookup(asTopic("/italy/happiness")); assertTrue(matchedNode.isPresent(), "Node on path /italy/happiness must be present"); - final Set subscriptions = matchedNode.get().subscriptions(); + final Collection subscriptions = matchedNode.get().subscriptions(); assertTrue(subscriptions.contains(asSubscription("HappinessSensor", "/italy/happiness"))); }