From 6fa111643e52bbe43f8985d371aa4bc0895832b2 Mon Sep 17 00:00:00 2001 From: Hylke van der Schaaf Date: Wed, 12 Jun 2024 09:41:53 +0200 Subject: [PATCH 1/9] Fixes 841: cleanTomb used compareAndSet to update a reference, but incorrectly re-fetched the 'original' instead of using the version that was used to make the copy. --- ChangeLog.txt | 1 + .../java/io/moquette/broker/subscriptions/CTrie.java | 10 ++++------ 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/ChangeLog.txt b/ChangeLog.txt index 2128e61a3..49043c44b 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -1,4 +1,5 @@ Version 0.18-SNAPSHOT: + [fix] Incorrect reference used in compareAndSet in CTrie.cleanTomb. (#841) [feature] Implement response-information property for request-response flow. (#840) [fix] Optimised page file opening for disk-based queues. (#837) [feature] Manage payload format indicator property, when set verify payload format. (#826) diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java b/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java index 874b0f49a..cd3038a1e 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java @@ -360,10 +360,7 @@ private Action remove(String clientId, Topic topic, INode inode, INode iParent, } } if (cnode instanceof TNode) { - // this inode is a tomb, has no clients and should be cleaned up - // Because we implemented cleanTomb below, this should be rare, but possible - // Consider calling cleanTomb here too - return Action.OK; + return cleanTomb(inode, iParent); } if (cnode.containsOnly(clientId) && topic.isEmpty() && cnode.allChildren().isEmpty()) { // last client to leave this node, AND there are no downstream children, remove via TNode tomb @@ -393,9 +390,10 @@ private Action remove(String clientId, Topic topic, INode inode, INode iParent, * @return REPEAT if this method wasn't successful or OK. */ private Action cleanTomb(INode inode, INode iParent) { - CNode updatedCnode = iParent.mainNode().copy(); + CNode origCnode = iParent.mainNode(); + CNode updatedCnode = origCnode.copy(); updatedCnode.remove(inode); - return iParent.compareAndSet(iParent.mainNode(), updatedCnode) ? Action.OK : Action.REPEAT; + return iParent.compareAndSet(origCnode, updatedCnode) ? Action.OK : Action.REPEAT; } public int size() { From 4af34830e390c3b0c69d0fd77519fef6afb06300 Mon Sep 17 00:00:00 2001 From: Hylke van der Schaaf Date: Thu, 13 Jun 2024 13:29:42 +0200 Subject: [PATCH 2/9] Fixed possible race condition in cleanTomb that could result in the removal of a live node --- .../java/io/moquette/broker/subscriptions/CNode.java | 4 ++-- .../java/io/moquette/broker/subscriptions/CTrie.java | 11 +++++++++-- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/CNode.java b/broker/src/main/java/io/moquette/broker/subscriptions/CNode.java index 00c813636..faa349aa7 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/CNode.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/CNode.java @@ -99,9 +99,9 @@ public void add(INode newINode) { } } - public void remove(INode node) { + public INode remove(INode node) { int idx = findIndexForToken(node.mainNode().token); - this.children.remove(idx); + return this.children.remove(idx); } private List sharedSubscriptions() { diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java b/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java index cd3038a1e..a71d67961 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java @@ -392,8 +392,15 @@ private Action remove(String clientId, Topic topic, INode inode, INode iParent, private Action cleanTomb(INode inode, INode iParent) { CNode origCnode = iParent.mainNode(); CNode updatedCnode = origCnode.copy(); - updatedCnode.remove(inode); - return iParent.compareAndSet(origCnode, updatedCnode) ? Action.OK : Action.REPEAT; + INode removed = updatedCnode.remove(inode); + if (removed == inode) { + return iParent.compareAndSet(origCnode, updatedCnode) ? Action.OK : Action.REPEAT; + } else { + // The node removed (from the copy!) was not the node we expected to remove. + // Probably because another thread replaced the TNode with a live node, so + // we don't need to clean it and can return success. + return Action.OK; + } } public int size() { From b8c68c812ac7b41ce10b033223c2b69d10dee919 Mon Sep 17 00:00:00 2001 From: Hylke van der Schaaf Date: Tue, 30 Jul 2024 13:54:17 +0200 Subject: [PATCH 3/9] Experiment with artificial tree-deepening --- .../moquette/broker/subscriptions/CTrie.java | 1 + .../moquette/broker/subscriptions/Token.java | 14 ++++ .../moquette/broker/subscriptions/Topic.java | 76 ++++++++++++++++--- .../broker/subscriptions/CTrieSpeedTest.java | 60 ++++++++++----- .../broker/subscriptions/TopicTest.java | 2 +- 5 files changed, 122 insertions(+), 31 deletions(-) diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java b/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java index a71d67961..6143c1a47 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java @@ -254,6 +254,7 @@ private List recursiveMatch(Topic topicName, INode inode, int dept // type #, + or exact match Optional subInode = cnode.childOf(Token.MULTI); if (subInode.isPresent()) { + Topic remainingRealTopic = (ROOT.equals(cnode.getToken())) ? topicName : topicName.exceptFullHeadToken(); subscriptions.addAll(recursiveMatch(remainingTopic, subInode.get(), depth + 1)); } subInode = cnode.childOf(Token.SINGLE); diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/Token.java b/broker/src/main/java/io/moquette/broker/subscriptions/Token.java index ca3544ed1..64bc8cf27 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/Token.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/Token.java @@ -25,15 +25,29 @@ public class Token implements Comparable { static final Token MULTI = new Token("#"); static final Token SINGLE = new Token("+"); final String name; + boolean lastSubToken; protected Token(String s) { + this(s, true); + } + + protected Token(String s, boolean isLastSub) { name = s; + lastSubToken = isLastSub; } protected String name() { return name; } + protected void setLastSubToken(boolean lastSubToken) { + this.lastSubToken = lastSubToken; + } + + protected boolean isLastSubToken() { + return lastSubToken; + } + @Override public int hashCode() { int hash = 7; diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/Topic.java b/broker/src/main/java/io/moquette/broker/subscriptions/Topic.java index 96940e116..3778f6571 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/Topic.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/Topic.java @@ -31,6 +31,8 @@ public class Topic implements Serializable, Comparable { private static final Logger LOG = LoggerFactory.getLogger(Topic.class); + public static int MAX_TOKEN_LENGTH = 4; + private static final long serialVersionUID = 2438799283749822L; private final String topic; @@ -55,7 +57,7 @@ public Topic(String topic) { Topic(List tokens) { this.tokens = tokens; - List strTokens = tokens.stream().map(Token::toString).collect(Collectors.toList()); + List strTokens = fullTokens().stream().map(Token::toString).collect(Collectors.toList()); this.topic = String.join("/", strTokens); this.valid = true; } @@ -74,7 +76,24 @@ public List getTokens() { return tokens; } - private List parseTopic(String topic) throws ParseException { + public List fullTokens() { + List fullTokens = new ArrayList<>(); + String currentToken = null; + for (Token token : getTokens()) { + if (currentToken == null) { + currentToken = token.name; + } else { + currentToken += token.name; + } + if (token.isLastSubToken()) { + fullTokens.add(new Token(currentToken, true)); + currentToken = null; + } + } + return fullTokens; + } + + private static List parseTopic(String topic) throws ParseException { if (topic.length() == 0) { throw new ParseException("Bad format of topic, topic MUST be at least 1 character [MQTT-4.7.3-1] and " + "this was empty", 0); @@ -117,7 +136,18 @@ private List parseTopic(String topic) throws ParseException { } else if (s.contains("+")) { throw new ParseException("Bad format of topic, invalid subtopic name: " + s, i); } else { - res.add(new Token(s)); + final int l = s.length(); + int start = 0; + Token token = null; + while (start < l) { + int end = Math.min(start + MAX_TOKEN_LENGTH, l); + final String subToken = s.substring(start, end); + token = new Token(subToken, false); + res.add(token); + start = end; + } + // Can't be null because s can't be empty. + token.setLastSubToken(true); } } @@ -151,6 +181,22 @@ public Topic exceptHeadToken() { return new Topic(tokensCopy); } + /** + * @return a new Topic corresponding to this less than the full head token, skipping any sub-tokens. + */ + public Topic exceptFullHeadToken() { + List tokens = getTokens(); + if (tokens.isEmpty()) { + return new Topic(Collections.emptyList()); + } + List tokensCopy = new ArrayList<>(tokens); + Token removed; + do { + removed = tokensCopy.remove(0); + } while (!removed.isLastSubToken() && !tokensCopy.isEmpty()); + return new Topic(tokensCopy); + } + public boolean isValid() { if (tokens == null) getTokens(); @@ -169,14 +215,16 @@ public boolean isValid() { public boolean match(Topic subscriptionTopic) { List msgTokens = getTokens(); List subscriptionTokens = subscriptionTopic.getTokens(); + // Due to sub-tokens and the + wildcard, indexes may differ. int i = 0; - for (; i < subscriptionTokens.size(); i++) { + int m = 0; + for (; i < subscriptionTokens.size(); i++, m++) { Token subToken = subscriptionTokens.get(i); if (!Token.MULTI.equals(subToken) && !Token.SINGLE.equals(subToken)) { - if (i >= msgTokens.size()) { + if (m >= msgTokens.size()) { return false; } - Token msgToken = msgTokens.get(i); + Token msgToken = msgTokens.get(m); if (!msgToken.equals(subToken)) { return false; } @@ -184,12 +232,20 @@ public boolean match(Topic subscriptionTopic) { if (Token.MULTI.equals(subToken)) { return true; } -// if (Token.SINGLE.equals(subToken)) { -// // skip a step forward -// } + if (m >= msgTokens.size()) { + return false; + } + if (Token.SINGLE.equals(subToken)) { + // skip to the next full token in the message topic + Token msgToken = msgTokens.get(m); + while (!msgToken.isLastSubToken()) { + m++; + msgToken = msgTokens.get(m); + } + } } } - return i == msgTokens.size(); + return m == msgTokens.size(); } @Override diff --git a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSpeedTest.java b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSpeedTest.java index 60660cbfd..98e0de858 100644 --- a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSpeedTest.java +++ b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSpeedTest.java @@ -44,21 +44,34 @@ static SubscriptionRequest clientSubOnTopic(String clientID, String topicName) { @Test @Timeout(value = MAX_DURATION_S) public void testManyClientsFewTopics() { - List subscriptionList = prepareSubscriptionsManyClientsFewTopic(); + + List subscriptionList = prepareSubscriptionsManyClientsFewTopic(50_000); createSubscriptions(subscriptionList); } @Test @Timeout(value = MAX_DURATION_S) public void testFlat() { - List results = prepareSubscriptionsFlat(); - createSubscriptions(results); + Topic.MAX_TOKEN_LENGTH = 1; + createSubscriptions(prepareSubscriptionsFlat(TOTAL_SUBSCRIPTIONS)); + Topic.MAX_TOKEN_LENGTH = 2; + createSubscriptions(prepareSubscriptionsFlat(TOTAL_SUBSCRIPTIONS)); + Topic.MAX_TOKEN_LENGTH = 3; + createSubscriptions(prepareSubscriptionsFlat(TOTAL_SUBSCRIPTIONS)); + Topic.MAX_TOKEN_LENGTH = 4; + createSubscriptions(prepareSubscriptionsFlat(TOTAL_SUBSCRIPTIONS)); + Topic.MAX_TOKEN_LENGTH = 5; + createSubscriptions(prepareSubscriptionsFlat(TOTAL_SUBSCRIPTIONS)); + Topic.MAX_TOKEN_LENGTH = 6; + createSubscriptions(prepareSubscriptionsFlat(TOTAL_SUBSCRIPTIONS)); + Topic.MAX_TOKEN_LENGTH = 7; + createSubscriptions(prepareSubscriptionsFlat(TOTAL_SUBSCRIPTIONS)); } @Test @Timeout(value = MAX_DURATION_S) public void testDeep() { - List results = prepareSubscriptionsDeep(); + List results = prepareSubscriptionsDeep(TOTAL_SUBSCRIPTIONS); createSubscriptions(results); } @@ -83,38 +96,45 @@ public void createSubscriptions(List results) { } long end = System.currentTimeMillis(); long duration = end - start; - LOGGER.info("Added " + count + " subscriptions in " + duration + " ms (" + Math.round(1000.0 * count / duration) + "/s)"); + final long speed = Math.round(1000.0 * count / duration); + LOGGER.info("{}: Added {} subscriptions in {} ms ({}/s)", Topic.MAX_TOKEN_LENGTH, count, duration, speed); } - public List prepareSubscriptionsManyClientsFewTopic() { - List subscriptionList = new ArrayList<>(TOTAL_SUBSCRIPTIONS); - for (int i = 0; i < TOTAL_SUBSCRIPTIONS; i++) { - Topic topic = asTopic("topic/test/" + new Random().nextInt(1 + i % 10) + "/test"); + public List prepareSubscriptionsManyClientsFewTopic(int subCount) { + List subscriptionList = new ArrayList<>(subCount); + long start = System.currentTimeMillis(); + for (int i = 0; i < subCount; i++) { + Topic topic = asTopic("topic/test/" + new Random().nextInt(10) + "/test"); subscriptionList.add(SubscriptionRequest.buildNonShared("TestClient-" + i, topic, MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_LEAST_ONCE))); } + long end = System.currentTimeMillis(); + long duration = end - start; + LOGGER.debug("Prepared {} subscriptions in {} ms on 10 topics", subCount, duration); return subscriptionList; } - public List prepareSubscriptionsFlat() { - List results = new ArrayList<>(TOTAL_SUBSCRIPTIONS); + public List prepareSubscriptionsFlat(int subCount) { + List results = new ArrayList<>(subCount); int count = 0; long start = System.currentTimeMillis(); - for (int topicNr = 0; topicNr < TOTAL_SUBSCRIPTIONS / 10; topicNr++) { - for (int clientNr = 0; clientNr < 10; clientNr++) { + final int clientCount = 1; + final int topicCount = subCount / clientCount; + for (int clientNr = 0; clientNr < clientCount; clientNr++) { + for (int topicNr = 0; topicNr < topicCount; topicNr++) { count++; - results.add(clientSubOnTopic("Client-" + clientNr, "mainTopic-" + topicNr)); + results.add(clientSubOnTopic("Client-" + clientNr, topicNr + "-mainTopic")); } } long end = System.currentTimeMillis(); long duration = end - start; - LOGGER.info("Prepared {} subscriptions in {} ms", count, duration); + LOGGER.debug("Prepared {} subscriptions for {} topics in {} ms", count, topicCount, duration); return results; } - public List prepareSubscriptionsDeep() { - List results = new ArrayList<>(TOTAL_SUBSCRIPTIONS); - long countPerLevel = Math.round(Math.pow(TOTAL_SUBSCRIPTIONS, 0.25)); - LOGGER.info("Preparing {} subscriptions, 4 deep with {} per level", TOTAL_SUBSCRIPTIONS, countPerLevel); + public List prepareSubscriptionsDeep(int subCount) { + List results = new ArrayList<>(subCount); + long countPerLevel = Math.round(Math.pow(subCount, 0.25)); + LOGGER.info("Preparing {} subscriptions, 4 deep with {} per level", subCount, countPerLevel); int count = 0; long start = System.currentTimeMillis(); outerloop: @@ -125,7 +145,7 @@ public List prepareSubscriptionsDeep() { count++; results.add(clientSubOnTopic("Client-" + clientNr, "mainTopic-" + firstLevelNr + "/subTopic-" + secondLevelNr + "/subSubTopic" + thirdLevelNr)); // Due to the 4th-power-root we don't get exactly the required number of subs. - if (count >= TOTAL_SUBSCRIPTIONS) { + if (count >= subCount) { break outerloop; } } diff --git a/broker/src/test/java/io/moquette/broker/subscriptions/TopicTest.java b/broker/src/test/java/io/moquette/broker/subscriptions/TopicTest.java index 134213f6a..49ee4d565 100644 --- a/broker/src/test/java/io/moquette/broker/subscriptions/TopicTest.java +++ b/broker/src/test/java/io/moquette/broker/subscriptions/TopicTest.java @@ -145,7 +145,7 @@ public TopicAssert doesNotMatch(String topic) { } public TopicAssert containsToken(Object... tokens) { - Assertions.assertThat(actual.getTokens()).containsExactly(asArray(tokens)); + Assertions.assertThat(actual.fullTokens()).containsExactly(asArray(tokens)); return myself; } From 808b32dccf444da542ef6b124c1d3594a3946672 Mon Sep 17 00:00:00 2001 From: Hylke van der Schaaf Date: Sat, 3 Aug 2024 14:52:47 +0200 Subject: [PATCH 4/9] Fixed matching + and # containing topics --- .../java/io/moquette/broker/subscriptions/CTrie.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java b/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java index 6143c1a47..9cbbc9ef6 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java @@ -247,14 +247,21 @@ private List recursiveMatch(Topic topicName, INode inode, int dept if (action == NavigationAction.STOP) { return Collections.emptyList(); } - Topic remainingTopic = (ROOT.equals(cnode.getToken())) ? topicName : topicName.exceptHeadToken(); + final boolean isRoot = ROOT.equals(cnode.getToken()); + final boolean isSingle = Token.SINGLE.equals(cnode.getToken()); + final boolean isMulti = Token.MULTI.equals(cnode.getToken()); + + Topic remainingTopic = isRoot + ? topicName + : (isSingle || isMulti) + ? topicName.exceptFullHeadToken() + : topicName.exceptHeadToken(); List subscriptions = new ArrayList<>(); // We should only consider the maximum three children children of // type #, + or exact match Optional subInode = cnode.childOf(Token.MULTI); if (subInode.isPresent()) { - Topic remainingRealTopic = (ROOT.equals(cnode.getToken())) ? topicName : topicName.exceptFullHeadToken(); subscriptions.addAll(recursiveMatch(remainingTopic, subInode.get(), depth + 1)); } subInode = cnode.childOf(Token.SINGLE); From cb4469da4f6309a51629b680b1cd7d3359b88b10 Mon Sep 17 00:00:00 2001 From: Hylke van der Schaaf Date: Sat, 3 Aug 2024 16:13:56 +0200 Subject: [PATCH 5/9] Disable really slow test for now --- .../broker/subscriptions/CTrieSpeedTest.java | 161 +++++++++++++----- 1 file changed, 120 insertions(+), 41 deletions(-) diff --git a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSpeedTest.java b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSpeedTest.java index 98e0de858..97aa5e596 100644 --- a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSpeedTest.java +++ b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSpeedTest.java @@ -24,6 +24,10 @@ import java.util.Random; import io.netty.handler.codec.mqtt.MqttSubscriptionOption; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.slf4j.Logger; @@ -33,71 +37,131 @@ public class CTrieSpeedTest { private static final Logger LOGGER = LoggerFactory.getLogger(CTrieSpeedTest.class.getName()); - private static final int MAX_DURATION_S = 5 * 60; - private static final int CHECK_INTERVAL = 50_000; + private static final int MAX_DURATION_S = 50 * 60; + private static final int CHECK_INTERVAL = 5000_000; private static final int TOTAL_SUBSCRIPTIONS = 500_000; + private static final Map>> TEST_RESULTS = new TreeMap<>(); + + private static void addResult(TestResult result) { + TEST_RESULTS.computeIfAbsent(result.threads, t -> new TreeMap<>()) + .computeIfAbsent(result.maxLength, t -> new ArrayList<>()) + .add(result); + } + + private static void logResults() { + StringBuilder header = new StringBuilder(); + TreeMap rowPerLength = new TreeMap<>(); + for (Map.Entry>> entry : TEST_RESULTS.entrySet()) { + Integer threads = entry.getKey(); + Map> lengthMap = entry.getValue(); + header.append(',').append(threads); + for (Map.Entry> innerEntry : lengthMap.entrySet()) { + Integer length = innerEntry.getKey(); + List results = innerEntry.getValue(); + int count = results.size(); + double durationAvg = 0; + for (TestResult result : results) { + durationAvg += 1.0 * result.durationMs / count; + } + rowPerLength.computeIfAbsent(length, t -> new StringBuilder("" + t)).append(',').append(durationAvg); + } + } + LOGGER.info(header.toString()); + for (StringBuilder row : rowPerLength.values()) { + LOGGER.info("{}", row); + } + } + static SubscriptionRequest clientSubOnTopic(String clientID, String topicName) { return SubscriptionRequest.buildNonShared(clientID, asTopic(topicName), MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_MOST_ONCE)); } @Test @Timeout(value = MAX_DURATION_S) - public void testManyClientsFewTopics() { - + public void testManyClientsFewTopics() throws InterruptedException { + LOGGER.info("testManyClientsFewTopics"); List subscriptionList = prepareSubscriptionsManyClientsFewTopic(50_000); - createSubscriptions(subscriptionList); + createSubscriptions(subscriptionList, 1); } @Test - @Timeout(value = MAX_DURATION_S) - public void testFlat() { - Topic.MAX_TOKEN_LENGTH = 1; - createSubscriptions(prepareSubscriptionsFlat(TOTAL_SUBSCRIPTIONS)); - Topic.MAX_TOKEN_LENGTH = 2; - createSubscriptions(prepareSubscriptionsFlat(TOTAL_SUBSCRIPTIONS)); - Topic.MAX_TOKEN_LENGTH = 3; - createSubscriptions(prepareSubscriptionsFlat(TOTAL_SUBSCRIPTIONS)); - Topic.MAX_TOKEN_LENGTH = 4; - createSubscriptions(prepareSubscriptionsFlat(TOTAL_SUBSCRIPTIONS)); - Topic.MAX_TOKEN_LENGTH = 5; - createSubscriptions(prepareSubscriptionsFlat(TOTAL_SUBSCRIPTIONS)); - Topic.MAX_TOKEN_LENGTH = 6; - createSubscriptions(prepareSubscriptionsFlat(TOTAL_SUBSCRIPTIONS)); - Topic.MAX_TOKEN_LENGTH = 7; - createSubscriptions(prepareSubscriptionsFlat(TOTAL_SUBSCRIPTIONS)); + @Disabled + public void testFlat() throws Exception { + LOGGER.info("TestFlat"); + TEST_RESULTS.clear(); + Callable> subCreate = () -> prepareSubscriptionsFlat(TOTAL_SUBSCRIPTIONS); + for (int length : new int[]{1, 2, 3, 4, 5, 6}) { + for (int threads : new int[]{1, 2, 4, 6, 8, 12, 16}) { + test(subCreate, threads, length); + logResults(); + } + } + logResults(); + } + + private void test(Callable> subCreate, int threadCount, int maxLength) throws Exception { + Topic.MAX_TOKEN_LENGTH = maxLength; + createSubscriptions(subCreate.call(), threadCount); + createSubscriptions(subCreate.call(), threadCount); } @Test @Timeout(value = MAX_DURATION_S) - public void testDeep() { + public void testDeep() throws InterruptedException { + LOGGER.info("TestDeep"); List results = prepareSubscriptionsDeep(TOTAL_SUBSCRIPTIONS); - createSubscriptions(results); + createSubscriptions(results, 1); } - public void createSubscriptions(List results) { - int count = 0; + public void createSubscriptions(List results, int threadCount) throws InterruptedException { long start = System.currentTimeMillis(); - int log = CHECK_INTERVAL; CTrie tree = new CTrie(); - for (SubscriptionRequest result : results) { - tree.addToTree(result); - count++; - log--; - if (log <= 0) { - log = CHECK_INTERVAL; - long end = System.currentTimeMillis(); - long duration = end - start; - LOGGER.info("Added {} subscriptions in {} ms ({}/ms)", count, duration, Math.round(1.0 * count / duration)); - } - if (Thread.currentThread().isInterrupted()) { - return; - } + List> subLists = new ArrayList<>(); + final int total = results.size(); + int perList = total / threadCount; + int startIdx = 0; + for (int idx = 0; idx < threadCount - 1; idx++) { + int endIdx = startIdx + perList; + subLists.add(results.subList(startIdx, endIdx)); + startIdx = endIdx; + } + subLists.add(results.subList(startIdx, total - 1)); + + List threads = new ArrayList<>(); + for (int idx = 0; idx < threadCount; idx++) { + final List workList = subLists.get(idx); + threads.add(new Thread(() -> { + int log = CHECK_INTERVAL; + int count = 0; + for (SubscriptionRequest result : workList) { + tree.addToTree(result); + count++; + log--; + if (log <= 0) { + log = CHECK_INTERVAL; + long end = System.currentTimeMillis(); + long duration = end - start; + LOGGER.info("Threads {}, Subs {}, time(ms) {}, perMs {}", threadCount, count, duration, Math.round(1.0 * count / duration)); + } + if (Thread.currentThread().isInterrupted()) { + return; + } + } + })); + } + for (Thread thread : threads) { + thread.start(); } + for (Thread thread : threads) { + thread.join(); + } + long end = System.currentTimeMillis(); long duration = end - start; - final long speed = Math.round(1000.0 * count / duration); - LOGGER.info("{}: Added {} subscriptions in {} ms ({}/s)", Topic.MAX_TOKEN_LENGTH, count, duration, speed); + final long speed = Math.round(1000.0 * total / duration); + LOGGER.info("{}, {}, {}, {}, {}", threadCount, Topic.MAX_TOKEN_LENGTH, total, duration, speed); + addResult(new TestResult(threadCount, Topic.MAX_TOKEN_LENGTH, total, duration)); } public List prepareSubscriptionsManyClientsFewTopic(int subCount) { @@ -158,4 +222,19 @@ public List prepareSubscriptionsDeep(int subCount) { return results; } + private static class TestResult { + + public final int threads; + public final int maxLength; + public final int topicCount; + public final long durationMs; + + public TestResult(int threads, int maxLength, int topicCount, long durationMs) { + this.threads = threads; + this.maxLength = maxLength; + this.topicCount = topicCount; + this.durationMs = durationMs; + } + + } } From be3fc456d2e12819d2c17e0de7d72a5a5f2d9470 Mon Sep 17 00:00:00 2001 From: Hylke van der Schaaf Date: Tue, 20 Aug 2024 14:34:00 +0200 Subject: [PATCH 6/9] Added Read and Remove performance tests for CTree --- .../broker/subscriptions/CTrieSpeedTest.java | 218 +++++++++++++++--- 1 file changed, 185 insertions(+), 33 deletions(-) diff --git a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSpeedTest.java b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSpeedTest.java index 97aa5e596..486a6607e 100644 --- a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSpeedTest.java +++ b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSpeedTest.java @@ -27,9 +27,9 @@ import java.util.Map; import java.util.TreeMap; import java.util.concurrent.Callable; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.Timeout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,20 +39,49 @@ public class CTrieSpeedTest { private static final int MAX_DURATION_S = 50 * 60; private static final int CHECK_INTERVAL = 5000_000; - private static final int TOTAL_SUBSCRIPTIONS = 500_000; + private static final int TOTAL_SUBSCRIPTIONS = 1_000_000; - private static final Map>> TEST_RESULTS = new TreeMap<>(); + private static final Map>> TEST_RESULTS_ADD = new TreeMap<>(); + private static final Map>> TEST_RESULTS_READ = new TreeMap<>(); + private static final Map>> TEST_RESULTS_REMOVE = new TreeMap<>(); - private static void addResult(TestResult result) { - TEST_RESULTS.computeIfAbsent(result.threads, t -> new TreeMap<>()) + private static void addAddResult(TestResult result) { + addResult(TEST_RESULTS_ADD, result); + } + + private static void addReadResult(TestResult result) { + addResult(TEST_RESULTS_READ, result); + } + + private static void addRemoveResult(TestResult result) { + addResult(TEST_RESULTS_REMOVE, result); + } + + private static void addResult(Map>> set, TestResult result) { + set.computeIfAbsent(result.threads, t -> new TreeMap<>()) .computeIfAbsent(result.maxLength, t -> new ArrayList<>()) .add(result); } private static void logResults() { + LOGGER.info("Results for Adding:"); + logResults(TEST_RESULTS_ADD); + LOGGER.info("Results for Reading:"); + logResults(TEST_RESULTS_READ); + LOGGER.info("Results for Removing:"); + logResults(TEST_RESULTS_REMOVE); + } + + private static void clearResults() { + TEST_RESULTS_ADD.clear(); + TEST_RESULTS_READ.clear(); + TEST_RESULTS_REMOVE.clear(); + } + + private static void logResults(Map>> set) { StringBuilder header = new StringBuilder(); TreeMap rowPerLength = new TreeMap<>(); - for (Map.Entry>> entry : TEST_RESULTS.entrySet()) { + for (Map.Entry>> entry : set.entrySet()) { Integer threads = entry.getKey(); Map> lengthMap = entry.getValue(); header.append(',').append(threads); @@ -64,7 +93,7 @@ private static void logResults() { for (TestResult result : results) { durationAvg += 1.0 * result.durationMs / count; } - rowPerLength.computeIfAbsent(length, t -> new StringBuilder("" + t)).append(',').append(durationAvg); + rowPerLength.computeIfAbsent(length, t -> new StringBuilder("" + t)).append(',').append(Math.round(durationAvg)); } } LOGGER.info(header.toString()); @@ -77,22 +106,38 @@ static SubscriptionRequest clientSubOnTopic(String clientID, String topicName) { return SubscriptionRequest.buildNonShared(clientID, asTopic(topicName), MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_MOST_ONCE)); } + @Disabled @Test - @Timeout(value = MAX_DURATION_S) - public void testManyClientsFewTopics() throws InterruptedException { + public void testManyClientsFewTopics() throws InterruptedException, Exception { LOGGER.info("testManyClientsFewTopics"); - List subscriptionList = prepareSubscriptionsManyClientsFewTopic(50_000); - createSubscriptions(subscriptionList, 1); + clearResults(); + Callable> subCreate = () -> prepareSubscriptionsManyClientsFewTopic(20_000); + test(subCreate); } - @Test @Disabled + @Test public void testFlat() throws Exception { LOGGER.info("TestFlat"); - TEST_RESULTS.clear(); + clearResults(); Callable> subCreate = () -> prepareSubscriptionsFlat(TOTAL_SUBSCRIPTIONS); - for (int length : new int[]{1, 2, 3, 4, 5, 6}) { + test(subCreate); + } + + @Disabled + @Test + public void testDeep() throws InterruptedException, Exception { + LOGGER.info("TestDeep"); + clearResults(); + Callable> subCreate = () -> prepareSubscriptionsDeep(TOTAL_SUBSCRIPTIONS); + test(subCreate); + } + + private void test(Callable> subCreate) throws Exception { + for (int length : new int[]{1, 2, 3, 4, 5, 6, 7, 8, 99}) { for (int threads : new int[]{1, 2, 4, 6, 8, 12, 16}) { + test(subCreate, threads, length); + test(subCreate, threads, length); test(subCreate, threads, length); logResults(); } @@ -102,31 +147,25 @@ public void testFlat() throws Exception { private void test(Callable> subCreate, int threadCount, int maxLength) throws Exception { Topic.MAX_TOKEN_LENGTH = maxLength; - createSubscriptions(subCreate.call(), threadCount); - createSubscriptions(subCreate.call(), threadCount); + final List subRequests = subCreate.call(); + CTrie tree = createSubscriptions(subRequests, threadCount); + readSubscriptions(tree, subRequests, threadCount); + removeSubscriptions(tree, subRequests, threadCount); } - @Test - @Timeout(value = MAX_DURATION_S) - public void testDeep() throws InterruptedException { - LOGGER.info("TestDeep"); - List results = prepareSubscriptionsDeep(TOTAL_SUBSCRIPTIONS); - createSubscriptions(results, 1); - } - - public void createSubscriptions(List results, int threadCount) throws InterruptedException { + public CTrie createSubscriptions(List subsToAdd, int threadCount) throws InterruptedException { long start = System.currentTimeMillis(); CTrie tree = new CTrie(); List> subLists = new ArrayList<>(); - final int total = results.size(); + final int total = subsToAdd.size(); int perList = total / threadCount; int startIdx = 0; for (int idx = 0; idx < threadCount - 1; idx++) { int endIdx = startIdx + perList; - subLists.add(results.subList(startIdx, endIdx)); + subLists.add(subsToAdd.subList(startIdx, endIdx)); startIdx = endIdx; } - subLists.add(results.subList(startIdx, total - 1)); + subLists.add(subsToAdd.subList(startIdx, total)); List threads = new ArrayList<>(); for (int idx = 0; idx < threadCount; idx++) { @@ -161,19 +200,132 @@ public void createSubscriptions(List results, int threadCou long duration = end - start; final long speed = Math.round(1000.0 * total / duration); LOGGER.info("{}, {}, {}, {}, {}", threadCount, Topic.MAX_TOKEN_LENGTH, total, duration, speed); - addResult(new TestResult(threadCount, Topic.MAX_TOKEN_LENGTH, total, duration)); + addAddResult(new TestResult(threadCount, Topic.MAX_TOKEN_LENGTH, total, duration)); + return tree; + } + + public CTrie removeSubscriptions(CTrie tree, List subsToAdd, int threadCount) throws InterruptedException { + long start = System.currentTimeMillis(); + + List> subLists = new ArrayList<>(); + final int total = subsToAdd.size(); + int perList = total / threadCount; + int startIdx = 0; + for (int idx = 0; idx < threadCount - 1; idx++) { + int endIdx = startIdx + perList; + subLists.add(subsToAdd.subList(startIdx, endIdx)); + startIdx = endIdx; + } + subLists.add(subsToAdd.subList(startIdx, total)); + + List threads = new ArrayList<>(); + for (int idx = 0; idx < threadCount; idx++) { + final List workList = subLists.get(idx); + threads.add(new Thread(() -> { + int log = CHECK_INTERVAL; + int count = 0; + for (SubscriptionRequest subReq : workList) { + tree.removeFromTree(CTrie.UnsubscribeRequest.buildNonShared(subReq.getClientId(), subReq.getTopicFilter())); + count++; + log--; + if (log <= 0) { + log = CHECK_INTERVAL; + long end = System.currentTimeMillis(); + long duration = end - start; + LOGGER.info("Threads {}, Subs {}, time(ms) {}, perMs {}", threadCount, count, duration, Math.round(1.0 * count / duration)); + } + if (Thread.currentThread().isInterrupted()) { + return; + } + } + })); + } + for (Thread thread : threads) { + thread.start(); + } + for (Thread thread : threads) { + thread.join(); + } + + long end = System.currentTimeMillis(); + long duration = end - start; + final long speed = Math.round(1000.0 * total / duration); + LOGGER.info("{}, {}, {}, {}, {}", threadCount, Topic.MAX_TOKEN_LENGTH, total, duration, speed); + addRemoveResult(new TestResult(threadCount, Topic.MAX_TOKEN_LENGTH, total, duration)); + return tree; + } + + public void readSubscriptions(CTrie tree, List subsToRead, int threadCount) throws InterruptedException { + final long start1 = System.currentTimeMillis(); + List> subLists = new ArrayList<>(); + final int total = subsToRead.size(); + int perList = total / threadCount; + int startIdx = 0; + for (int idx = 0; idx < threadCount - 1; idx++) { + int endIdx = startIdx + perList; + subLists.add(subsToRead.subList(startIdx, endIdx)); + startIdx = endIdx; + } + subLists.add(subsToRead.subList(startIdx, total)); + + List threads = new ArrayList<>(); + for (int idx = 0; idx < threadCount; idx++) { + final List workList = subLists.get(idx); + threads.add(new Thread(() -> { + int log = CHECK_INTERVAL; + int count = 0; + for (SubscriptionRequest subReq : workList) { + + final Subscription subscription = subReq.subscription(); + final Topic topic = subReq.getTopicFilter(); + final List recursiveMatch = tree.recursiveMatch(topic); + final boolean contains = recursiveMatch.contains(subscription); + Assertions.assertTrue(contains, () -> "Failed to find " + subscription + " on " + topic + " found " + recursiveMatch.size() + " matches."); + + count++; + log--; + if (log <= 0) { + log = CHECK_INTERVAL; + long end = System.currentTimeMillis(); + long duration = end - start1; + LOGGER.info("Threads {}, Subs {}, time(ms) {}, perMs {}", threadCount, count, duration, Math.round(1.0 * count / duration)); + } + if (Thread.currentThread().isInterrupted()) { + return; + } + } + })); + } + for (Thread thread : threads) { + thread.start(); + } + for (Thread thread : threads) { + thread.join(); + } + + long end = System.currentTimeMillis(); + long duration = end - start1; + final long speed = Math.round(1000.0 * total / duration); + addReadResult(new TestResult(threadCount, Topic.MAX_TOKEN_LENGTH, total, duration)); + LOGGER.info("{}, {}, {}, {}, {}", threadCount, Topic.MAX_TOKEN_LENGTH, total, duration, speed); } public List prepareSubscriptionsManyClientsFewTopic(int subCount) { List subscriptionList = new ArrayList<>(subCount); + int total = 0; long start = System.currentTimeMillis(); - for (int i = 0; i < subCount; i++) { - Topic topic = asTopic("topic/test/" + new Random().nextInt(10) + "/test"); - subscriptionList.add(SubscriptionRequest.buildNonShared("TestClient-" + i, topic, MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_LEAST_ONCE))); + int groupSize = subCount / 10; + for (int i = 0; i < groupSize; i++) { + for (int group = 0; group < 10; group++) { + int idx = group * groupSize + i; + Topic topic = asTopic("topic/test"); + subscriptionList.add(SubscriptionRequest.buildNonShared("TestClient-" + idx, topic, MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_LEAST_ONCE))); + total++; + } } long end = System.currentTimeMillis(); long duration = end - start; - LOGGER.debug("Prepared {} subscriptions in {} ms on 10 topics", subCount, duration); + LOGGER.info("Prepared {} subscriptions in {} ms on 10 topics", total, duration); return subscriptionList; } From f38b771850d59b05ab6d21f0ee6a4206afe51629 Mon Sep 17 00:00:00 2001 From: Hylke van der Schaaf Date: Tue, 20 Aug 2024 14:47:38 +0200 Subject: [PATCH 7/9] Test use of PCollections --- broker/pom.xml | 6 ++ .../moquette/broker/subscriptions/CNode.java | 77 +++++++++---------- .../CTrieSubscriptionDirectory.java | 1 - .../broker/subscriptions/CTrieTest.java | 9 ++- 4 files changed, 46 insertions(+), 47 deletions(-) diff --git a/broker/pom.xml b/broker/pom.xml index 05e14b9b1..ef19f29cc 100644 --- a/broker/pom.xml +++ b/broker/pom.xml @@ -187,6 +187,12 @@ 1.15 + + org.pcollections + pcollections + 4.0.2 + + org.fusesource.mqtt-client mqtt-client diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/CNode.java b/broker/src/main/java/io/moquette/broker/subscriptions/CNode.java index faa349aa7..a58a74794 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/CNode.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/CNode.java @@ -22,6 +22,7 @@ import java.security.SecureRandom; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -33,52 +34,48 @@ import java.util.Random; import java.util.Set; import java.util.stream.Collectors; +import org.pcollections.PMap; +import org.pcollections.TreePMap; class CNode implements Comparable { public static final Random SECURE_RANDOM = new SecureRandom(); private final Token token; - private final List children; - // Sorted list of subscriptions. The sort is necessary for fast access, instead of linear scan. - private List subscriptions; + private PMap children; + // Map of subscriptions. Not a Set, because Set doesn't have a Get method and we may need to update. + private PMap subscriptions; // the list of SharedSubscription is sorted. The sort is necessary for fast access, instead of linear scan. private Map> sharedSubscriptions; CNode(Token token) { - this.children = new ArrayList<>(); - this.subscriptions = new ArrayList<>(); + this.children = TreePMap.empty(); + this.subscriptions = TreePMap.empty(); this.sharedSubscriptions = new HashMap<>(); this.token = token; } //Copy constructor - private CNode(Token token, List children, List subscriptions, Map> sharedSubscriptions) { + private CNode(Token token, PMap children, PMap subscriptions, Map> sharedSubscriptions) { this.token = token; // keep reference, root comparison in directory logic relies on it for now. - this.subscriptions = new ArrayList<>(subscriptions); + this.subscriptions = subscriptions; this.sharedSubscriptions = new HashMap<>(sharedSubscriptions); - this.children = new ArrayList<>(children); + this.children = children; } public Token getToken() { return token; } - List allChildren() { - return new ArrayList<>(this.children); + Collection allChildren() { + return this.children.values(); } Optional childOf(Token token) { - int idx = findIndexForToken(token); - if (idx < 0) { + INode value = children.get(token.name); + if (value == null) { return Optional.empty(); } - return Optional.of(children.get(idx)); - } - - private int findIndexForToken(Token token) { - final INode tempTokenNode = new INode(new CNode(token)); - return Collections.binarySearch(children, tempTokenNode, (INode node, INode tokenHolder) -> node.mainNode().token.compareTo(tokenHolder.mainNode().token)); + return Optional.of(value); } @Override @@ -91,17 +88,15 @@ CNode copy() { } public void add(INode newINode) { - int idx = findIndexForToken(newINode.mainNode().token); - if (idx < 0) { - children.add(-1 - idx, newINode); - } else { - children.add(idx, newINode); - } + final String tokenName = newINode.mainNode().token.name; + children = children.plus(tokenName, newINode); } public INode remove(INode node) { - int idx = findIndexForToken(node.mainNode().token); - return this.children.remove(idx); + final String tokenName = node.mainNode().token.name; + INode toRemove = children.get(tokenName); + children = children.minus(tokenName); + return toRemove; } private List sharedSubscriptions() { @@ -118,8 +113,8 @@ private List sharedSubscriptions() { return selectedSubscriptions; } - List subscriptions() { - return subscriptions; + Set subscriptions() { + return subscriptions.keySet(); } // Mutating operation @@ -141,25 +136,23 @@ CNode addSubscription(SubscriptionRequest request) { final Subscription newSubscription = request.subscription(); // if already contains one with same topic and same client, keep that with higher QoS - int idx = Collections.binarySearch(subscriptions, newSubscription); - if (idx >= 0) { + final Subscription existing = subscriptions.get(newSubscription); + if (existing != null) { // Subscription already exists - final Subscription existing = subscriptions.get(idx); if (needsToUpdateExistingSubscription(newSubscription, existing)) { - subscriptions.set(idx, newSubscription); + subscriptions = subscriptions.plus(newSubscription, newSubscription); } } else { // insert into the expected index so that the sorting is maintained - this.subscriptions.add(-1 - idx, newSubscription); + subscriptions = subscriptions.plus(newSubscription, newSubscription); } } return this; } private static boolean needsToUpdateExistingSubscription(Subscription newSubscription, Subscription existing) { - if ((newSubscription.hasSubscriptionIdentifier() && existing.hasSubscriptionIdentifier()) && - newSubscription.getSubscriptionIdentifier().equals(existing.getSubscriptionIdentifier()) - ) { + if ((newSubscription.hasSubscriptionIdentifier() && existing.hasSubscriptionIdentifier()) + && newSubscription.getSubscriptionIdentifier().equals(existing.getSubscriptionIdentifier())) { // if subscription identifier hasn't changed, // then check QoS but don't lower the requested QoS level return existing.option().qos().value() < newSubscription.option().qos().value(); @@ -177,7 +170,7 @@ private static boolean needsToUpdateExistingSubscription(Subscription newSubscri * AND at least one subscription is actually present for that clientId * */ boolean containsOnly(String clientId) { - for (Subscription sub : this.subscriptions) { + for (Subscription sub : this.subscriptions.values()) { if (!sub.clientId.equals(clientId)) { return false; } @@ -207,7 +200,7 @@ private static SharedSubscription wrapKey(String clientId) { //TODO this is equivalent to negate(containsOnly(clientId)) private boolean containsSubscriptionsForClient(String clientId) { - for (Subscription sub : this.subscriptions) { + for (Subscription sub : this.subscriptions.values()) { if (sub.clientId.equals(clientId)) { return true; } @@ -233,13 +226,13 @@ void removeSubscriptionsFor(UnsubscribeRequest request) { } else { // collect Subscription instances to remove Set toRemove = new HashSet<>(); - for (Subscription sub : this.subscriptions) { + for (Subscription sub : this.subscriptions.values()) { if (sub.clientId.equals(clientId)) { toRemove.add(sub); } } // effectively remove the instances - this.subscriptions.removeAll(toRemove); + subscriptions = subscriptions.minusAll(toRemove); } } @@ -251,7 +244,7 @@ public int compareTo(CNode o) { public List sharedAndNonSharedSubscriptions() { List shared = sharedSubscriptions(); List returnedSubscriptions = new ArrayList<>(subscriptions.size() + shared.size()); - returnedSubscriptions.addAll(subscriptions); + returnedSubscriptions.addAll(subscriptions.values()); returnedSubscriptions.addAll(shared); return returnedSubscriptions; } diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectory.java b/broker/src/main/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectory.java index e8efbac08..88feea7e1 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectory.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectory.java @@ -24,7 +24,6 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; diff --git a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieTest.java b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieTest.java index 524aab39b..2b42ab4ae 100644 --- a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieTest.java +++ b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieTest.java @@ -27,6 +27,7 @@ import static io.moquette.broker.subscriptions.SubscriptionTestUtils.asSubscription; import static io.moquette.broker.subscriptions.Topic.asTopic; import java.util.List; +import java.util.Set; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -54,11 +55,11 @@ public void testAddOnSecondLayerWithEmptyTokenOnEmptyTree() { assertThat(this.sut.root.mainNode().subscriptions()).isEmpty(); assertThat(this.sut.root.mainNode().allChildren()).isNotEmpty(); - INode firstLayer = this.sut.root.mainNode().allChildren().get(0); + INode firstLayer = this.sut.root.mainNode().allChildren().stream().findFirst().get(); assertThat(firstLayer.mainNode().subscriptions()).isEmpty(); assertThat(firstLayer.mainNode().allChildren()).isNotEmpty(); - INode secondLayer = firstLayer.mainNode().allChildren().get(0); + INode secondLayer = firstLayer.mainNode().allChildren().stream().findFirst().get(); assertThat(secondLayer.mainNode().subscriptions()).isNotEmpty(); assertThat(secondLayer.mainNode().allChildren()).isEmpty(); } @@ -99,7 +100,7 @@ public void testAddNewSubscriptionOnExistingNode() { //Verify final Optional matchedNode = sut.lookup(asTopic("/temp")); assertTrue(matchedNode.isPresent(), "Node on path /temp must be present"); - final List subscriptions = matchedNode.get().subscriptions(); + final Set subscriptions = matchedNode.get().subscriptions(); assertTrue(subscriptions.contains(asSubscription("TempSensor2", "/temp"))); } @@ -117,7 +118,7 @@ public void testAddNewDeepNodes() { //Verify final Optional matchedNode = sut.lookup(asTopic("/italy/happiness")); assertTrue(matchedNode.isPresent(), "Node on path /italy/happiness must be present"); - final List subscriptions = matchedNode.get().subscriptions(); + final Set subscriptions = matchedNode.get().subscriptions(); assertTrue(subscriptions.contains(asSubscription("HappinessSensor", "/italy/happiness"))); } From 0642876e1de86767ac11567840bef9279026483c Mon Sep 17 00:00:00 2001 From: Hylke van der Schaaf Date: Sun, 25 Aug 2024 10:15:27 +0200 Subject: [PATCH 8/9] Improved subscription remove performance --- .../moquette/broker/subscriptions/CNode.java | 37 ++++++------------- .../broker/subscriptions/CTrieTest.java | 5 ++- 2 files changed, 15 insertions(+), 27 deletions(-) diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/CNode.java b/broker/src/main/java/io/moquette/broker/subscriptions/CNode.java index a58a74794..232c74171 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/CNode.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/CNode.java @@ -42,8 +42,8 @@ class CNode implements Comparable { public static final Random SECURE_RANDOM = new SecureRandom(); private final Token token; private PMap children; - // Map of subscriptions. Not a Set, because Set doesn't have a Get method and we may need to update. - private PMap subscriptions; + // Map of subscriptions per clientId. + private PMap subscriptions; // the list of SharedSubscription is sorted. The sort is necessary for fast access, instead of linear scan. private Map> sharedSubscriptions; @@ -55,7 +55,7 @@ class CNode implements Comparable { } //Copy constructor - private CNode(Token token, PMap children, PMap subscriptions, Map> sharedSubscriptions) { + private CNode(Token token, PMap children, PMap subscriptions, Map> sharedSubscriptions) { this.token = token; // keep reference, root comparison in directory logic relies on it for now. this.subscriptions = subscriptions; this.sharedSubscriptions = new HashMap<>(sharedSubscriptions); @@ -113,8 +113,8 @@ private List sharedSubscriptions() { return selectedSubscriptions; } - Set subscriptions() { - return subscriptions.keySet(); + Collection subscriptions() { + return subscriptions.values(); } // Mutating operation @@ -136,15 +136,15 @@ CNode addSubscription(SubscriptionRequest request) { final Subscription newSubscription = request.subscription(); // if already contains one with same topic and same client, keep that with higher QoS - final Subscription existing = subscriptions.get(newSubscription); + final Subscription existing = subscriptions.get(newSubscription.clientId); if (existing != null) { // Subscription already exists if (needsToUpdateExistingSubscription(newSubscription, existing)) { - subscriptions = subscriptions.plus(newSubscription, newSubscription); + subscriptions = subscriptions.plus(newSubscription.clientId, newSubscription); } } else { // insert into the expected index so that the sorting is maintained - subscriptions = subscriptions.plus(newSubscription, newSubscription); + subscriptions = subscriptions.plus(newSubscription.clientId, newSubscription); } } return this; @@ -170,8 +170,8 @@ private static boolean needsToUpdateExistingSubscription(Subscription newSubscri * AND at least one subscription is actually present for that clientId * */ boolean containsOnly(String clientId) { - for (Subscription sub : this.subscriptions.values()) { - if (!sub.clientId.equals(clientId)) { + for (String sub : this.subscriptions.keySet()) { + if (!sub.equals(clientId)) { return false; } } @@ -200,12 +200,7 @@ private static SharedSubscription wrapKey(String clientId) { //TODO this is equivalent to negate(containsOnly(clientId)) private boolean containsSubscriptionsForClient(String clientId) { - for (Subscription sub : this.subscriptions.values()) { - if (sub.clientId.equals(clientId)) { - return true; - } - } - return false; + return subscriptions.containsKey(clientId); } void removeSubscriptionsFor(UnsubscribeRequest request) { @@ -224,15 +219,7 @@ void removeSubscriptionsFor(UnsubscribeRequest request) { this.sharedSubscriptions.replace(request.getSharedName(), subscriptionsForName); } } else { - // collect Subscription instances to remove - Set toRemove = new HashSet<>(); - for (Subscription sub : this.subscriptions.values()) { - if (sub.clientId.equals(clientId)) { - toRemove.add(sub); - } - } - // effectively remove the instances - subscriptions = subscriptions.minusAll(toRemove); + subscriptions = subscriptions.minus(clientId); } } diff --git a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieTest.java b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieTest.java index 2b42ab4ae..7a7221abe 100644 --- a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieTest.java +++ b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieTest.java @@ -26,6 +26,7 @@ import static io.moquette.broker.subscriptions.SubscriptionTestUtils.asSubscription; import static io.moquette.broker.subscriptions.Topic.asTopic; +import java.util.Collection; import java.util.List; import java.util.Set; import static org.assertj.core.api.Assertions.assertThat; @@ -100,7 +101,7 @@ public void testAddNewSubscriptionOnExistingNode() { //Verify final Optional matchedNode = sut.lookup(asTopic("/temp")); assertTrue(matchedNode.isPresent(), "Node on path /temp must be present"); - final Set subscriptions = matchedNode.get().subscriptions(); + final Collection subscriptions = matchedNode.get().subscriptions(); assertTrue(subscriptions.contains(asSubscription("TempSensor2", "/temp"))); } @@ -118,7 +119,7 @@ public void testAddNewDeepNodes() { //Verify final Optional matchedNode = sut.lookup(asTopic("/italy/happiness")); assertTrue(matchedNode.isPresent(), "Node on path /italy/happiness must be present"); - final Set subscriptions = matchedNode.get().subscriptions(); + final Collection subscriptions = matchedNode.get().subscriptions(); assertTrue(subscriptions.contains(asSubscription("HappinessSensor", "/italy/happiness"))); } From e9133660540249defe5f7c29a7ab65a45ef8605c Mon Sep 17 00:00:00 2001 From: Hylke van der Schaaf Date: Sat, 31 Aug 2024 17:43:28 +0200 Subject: [PATCH 9/9] Rewrote subscription fetching to not copy lists of subscriptions --- .../java/io/moquette/broker/PostOffice.java | 3 +- .../moquette/broker/subscriptions/CNode.java | 37 ++--- .../moquette/broker/subscriptions/CTrie.java | 29 ++-- .../CTrieSubscriptionDirectory.java | 27 +--- .../broker/subscriptions/DumpTreeVisitor.java | 11 +- .../ISubscriptionsDirectory.java | 4 +- .../broker/subscriptions/ShareName.java | 11 +- .../subscriptions/SubscriptionCollection.java | 133 ++++++++++++++++++ .../SubscriptionCounterVisitor.java | 4 +- .../broker/PostOfficeInternalPublishTest.java | 3 +- .../broker/PostOfficePublishTest.java | 3 +- .../broker/PostOfficeSubscribeTest.java | 5 +- .../broker/PostOfficeUnsubscribeTest.java | 3 +- ...aredSubscriptionDirectoryMatchingTest.java | 14 +- .../broker/subscriptions/CTrieSpeedTest.java | 12 +- ...TrieSubscriptionDirectoryMatchingTest.java | 29 ++-- .../broker/subscriptions/CTrieTest.java | 53 ++++--- .../mqtt5/RequestResponseTest.java | 6 + 18 files changed, 252 insertions(+), 135 deletions(-) create mode 100644 broker/src/main/java/io/moquette/broker/subscriptions/SubscriptionCollection.java diff --git a/broker/src/main/java/io/moquette/broker/PostOffice.java b/broker/src/main/java/io/moquette/broker/PostOffice.java index 5ff719ac5..880c28460 100644 --- a/broker/src/main/java/io/moquette/broker/PostOffice.java +++ b/broker/src/main/java/io/moquette/broker/PostOffice.java @@ -58,6 +58,7 @@ import java.util.stream.Collectors; import static io.moquette.broker.Utils.messageId; +import io.moquette.broker.subscriptions.SubscriptionCollection; import static io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader.from; import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE; import static io.netty.handler.codec.mqtt.MqttQoS.EXACTLY_ONCE; @@ -842,7 +843,7 @@ private RoutingResults publish2Subscribers(String publisherClientId, final boolean retainPublish = msg.fixedHeader().isRetain(); final Topic topic = new Topic(msg.variableHeader().topicName()); final MqttQoS publishingQos = msg.fixedHeader().qosLevel(); - List topicMatchingSubscriptions = subscriptions.matchQosSharpening(topic); + SubscriptionCollection topicMatchingSubscriptions = subscriptions.matchWithoutQosSharpening(topic); if (topicMatchingSubscriptions.isEmpty()) { // no matching subscriptions, clean exit LOG.trace("No matching subscriptions for topic: {}", topic); diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/CNode.java b/broker/src/main/java/io/moquette/broker/subscriptions/CNode.java index 232c74171..a37fe05c7 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/CNode.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/CNode.java @@ -45,20 +45,20 @@ class CNode implements Comparable { // Map of subscriptions per clientId. private PMap subscriptions; // the list of SharedSubscription is sorted. The sort is necessary for fast access, instead of linear scan. - private Map> sharedSubscriptions; + private PMap> sharedSubscriptions; CNode(Token token) { this.children = TreePMap.empty(); this.subscriptions = TreePMap.empty(); - this.sharedSubscriptions = new HashMap<>(); + this.sharedSubscriptions = TreePMap.empty(); this.token = token; } //Copy constructor - private CNode(Token token, PMap children, PMap subscriptions, Map> sharedSubscriptions) { + private CNode(Token token, PMap children, PMap subscriptions, PMap> sharedSubscriptions) { this.token = token; // keep reference, root comparison in directory logic relies on it for now. this.subscriptions = subscriptions; - this.sharedSubscriptions = new HashMap<>(sharedSubscriptions); + this.sharedSubscriptions = sharedSubscriptions; this.children = children; } @@ -99,22 +99,12 @@ public INode remove(INode node) { return toRemove; } - private List sharedSubscriptions() { - List selectedSubscriptions = new ArrayList<>(sharedSubscriptions.size()); - // for each sharedSubscription related to a ShareName, select one subscription - for (Map.Entry> subsForName : sharedSubscriptions.entrySet()) { - List list = subsForName.getValue(); - final String shareName = subsForName.getKey().getShareName(); - // select a subscription randomly - int randIdx = SECURE_RANDOM.nextInt(list.size()); - SharedSubscription sub = list.get(randIdx); - selectedSubscriptions.add(sub.createSubscription()); - } - return selectedSubscriptions; + public PMap getSubscriptions() { + return subscriptions; } - Collection subscriptions() { - return subscriptions.values(); + public PMap> getSharedSubscriptions() { + return sharedSubscriptions; } // Mutating operation @@ -214,9 +204,9 @@ void removeSubscriptionsFor(UnsubscribeRequest request) { subscriptionsForName.removeAll(toRemove); if (subscriptionsForName.isEmpty()) { - this.sharedSubscriptions.remove(request.getSharedName()); + sharedSubscriptions = sharedSubscriptions.minus(request.getSharedName()); } else { - this.sharedSubscriptions.replace(request.getSharedName(), subscriptionsForName); + sharedSubscriptions = sharedSubscriptions.plus(request.getSharedName(), subscriptionsForName); } } else { subscriptions = subscriptions.minus(clientId); @@ -228,11 +218,4 @@ public int compareTo(CNode o) { return token.compareTo(o.token); } - public List sharedAndNonSharedSubscriptions() { - List shared = sharedSubscriptions(); - List returnedSubscriptions = new ArrayList<>(subscriptions.size() + shared.size()); - returnedSubscriptions.addAll(subscriptions.values()); - returnedSubscriptions.addAll(shared); - return returnedSubscriptions; - } } diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java b/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java index 9cbbc9ef6..9bdb03588 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java @@ -118,6 +118,7 @@ public SubscriptionIdentifier getSubscriptionIdentifier() { * Models a request to unsubscribe a client, it's carrier for the Subscription * */ public final static class UnsubscribeRequest { + private final Topic topicFilter; private final String clientId; private boolean shared = false; @@ -231,21 +232,25 @@ private NavigationAction evaluate(Topic topicName, CNode cnode, int depth) { return NavigationAction.STOP; } - public List recursiveMatch(Topic topicName) { - return recursiveMatch(topicName, this.root, 0); + public SubscriptionCollection recursiveMatch(Topic topicName) { + SubscriptionCollection subscriptions = new SubscriptionCollection(); + recursiveMatch(topicName, this.root, 0, subscriptions); + return subscriptions; } - private List recursiveMatch(Topic topicName, INode inode, int depth) { + private void recursiveMatch(Topic topicName, INode inode, int depth, SubscriptionCollection target) { CNode cnode = inode.mainNode(); if (cnode instanceof TNode) { - return Collections.emptyList(); + return; } NavigationAction action = evaluate(topicName, cnode, depth); if (action == NavigationAction.MATCH) { - return cnode.sharedAndNonSharedSubscriptions(); + target.addNormalSubscriptions(cnode.getSubscriptions()); + target.addSharedSubscriptions(cnode.getSharedSubscriptions()); + return; } if (action == NavigationAction.STOP) { - return Collections.emptyList(); + return; } final boolean isRoot = ROOT.equals(cnode.getToken()); final boolean isSingle = Token.SINGLE.equals(cnode.getToken()); @@ -256,27 +261,27 @@ private List recursiveMatch(Topic topicName, INode inode, int dept : (isSingle || isMulti) ? topicName.exceptFullHeadToken() : topicName.exceptHeadToken(); - List subscriptions = new ArrayList<>(); + SubscriptionCollection subscriptions = new SubscriptionCollection(); // We should only consider the maximum three children children of // type #, + or exact match Optional subInode = cnode.childOf(Token.MULTI); if (subInode.isPresent()) { - subscriptions.addAll(recursiveMatch(remainingTopic, subInode.get(), depth + 1)); + recursiveMatch(remainingTopic, subInode.get(), depth + 1, target); } subInode = cnode.childOf(Token.SINGLE); if (subInode.isPresent()) { - subscriptions.addAll(recursiveMatch(remainingTopic, subInode.get(), depth + 1)); + recursiveMatch(remainingTopic, subInode.get(), depth + 1, target); } if (remainingTopic.isEmpty()) { - subscriptions.addAll(cnode.sharedAndNonSharedSubscriptions()); + target.addNormalSubscriptions(cnode.getSubscriptions()); + target.addSharedSubscriptions(cnode.getSharedSubscriptions()); } else { subInode = cnode.childOf(remainingTopic.headToken()); if (subInode.isPresent()) { - subscriptions.addAll(recursiveMatch(remainingTopic, subInode.get(), depth + 1)); + recursiveMatch(remainingTopic, subInode.get(), depth + 1, target); } } - return subscriptions; } /** diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectory.java b/broker/src/main/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectory.java index 88feea7e1..a7c15987e 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectory.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectory.java @@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -77,34 +78,10 @@ public void init(ISubscriptionsRepository subscriptionsRepository) { * @return the list of matching subscriptions, or empty if not matching. */ @Override - public List matchWithoutQosSharpening(Topic topicName) { + public SubscriptionCollection matchWithoutQosSharpening(Topic topicName) { return ctrie.recursiveMatch(topicName); } - @Override - public List matchQosSharpening(Topic topicName) { - final List subscriptions = matchWithoutQosSharpening(topicName); - - // for each session select the subscription with higher QoS - return selectSubscriptionsWithHigherQoSForEachSession(subscriptions); - } - - private static List selectSubscriptionsWithHigherQoSForEachSession(List subscriptions) { - // for each session select the subscription with higher QoS - Map subsGroupedByClient = new HashMap<>(); - for (Subscription sub : subscriptions) { - // If same client is subscribed to two different shared subscription that overlaps - // then it has to return both subscriptions, because the share name made them independent. - final String key = sub.clientAndShareName(); - Subscription existingSub = subsGroupedByClient.get(key); - // update the selected subscriptions if not present or if it has a greater qos - if (existingSub == null || existingSub.qosLessThan(sub)) { - subsGroupedByClient.put(key, sub); - } - } - return new ArrayList<>(subsGroupedByClient.values()); - } - @Override public boolean add(String clientId, Topic filter, MqttSubscriptionOption option) { SubscriptionRequest subRequest = SubscriptionRequest.buildNonShared(clientId, filter, option); diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/DumpTreeVisitor.java b/broker/src/main/java/io/moquette/broker/subscriptions/DumpTreeVisitor.java index 3647ee0b8..09c73b2db 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/DumpTreeVisitor.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/DumpTreeVisitor.java @@ -31,22 +31,21 @@ private String prettySubscriptions(CNode node) { if (node instanceof TNode) { return "TNode"; } - if (node.subscriptions().isEmpty()) { + if (node.getSubscriptions().isEmpty()) { return StringUtil.EMPTY_STRING; } StringBuilder subScriptionsStr = new StringBuilder(" ~~["); int counter = 0; - for (Subscription couple : node.subscriptions()) { + for (Subscription couple : node.getSubscriptions().values()) { subScriptionsStr .append("{filter=").append(couple.topicFilter).append(", ") .append("option=").append(couple.option()).append(", ") .append("client='").append(couple.clientId).append("'}"); counter++; - if (counter < node.subscriptions().size()) { - subScriptionsStr.append(";"); - } + subScriptionsStr.append(";"); } - return subScriptionsStr.append("]").toString(); + final int length = subScriptionsStr.length(); + return subScriptionsStr.replace(length - 1, length, "]").toString(); } private String indentTabs(int deep) { diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/ISubscriptionsDirectory.java b/broker/src/main/java/io/moquette/broker/subscriptions/ISubscriptionsDirectory.java index 97af38035..d6e11c5c3 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/ISubscriptionsDirectory.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/ISubscriptionsDirectory.java @@ -27,9 +27,7 @@ public interface ISubscriptionsDirectory { void init(ISubscriptionsRepository sessionsRepository); - List matchWithoutQosSharpening(Topic topic); - - List matchQosSharpening(Topic topic); + SubscriptionCollection matchWithoutQosSharpening(Topic topic); boolean add(String clientId, Topic filter, MqttSubscriptionOption option); diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/ShareName.java b/broker/src/main/java/io/moquette/broker/subscriptions/ShareName.java index 2ec25b0fe..3f5f66caf 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/ShareName.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/ShareName.java @@ -21,7 +21,7 @@ * Shared subscription's name. */ // It's public because used by PostOffice -public final class ShareName { +public final class ShareName implements Comparable{ private final String shareName; public ShareName(String shareName) { @@ -36,8 +36,8 @@ public boolean equals(Object o) { return Objects.equals(shareName, (String) o); } if (getClass() != o.getClass()) return false; - ShareName shareName1 = (ShareName) o; - return Objects.equals(shareName, shareName1.shareName); + ShareName oShareName = (ShareName) o; + return Objects.equals(shareName, oShareName.shareName); } public String getShareName() { @@ -55,4 +55,9 @@ public String toString() { "shareName='" + shareName + '\'' + '}'; } + + @Override + public int compareTo(ShareName o) { + return shareName.compareTo(o.shareName); + } } diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/SubscriptionCollection.java b/broker/src/main/java/io/moquette/broker/subscriptions/SubscriptionCollection.java new file mode 100644 index 000000000..8f2065b0c --- /dev/null +++ b/broker/src/main/java/io/moquette/broker/subscriptions/SubscriptionCollection.java @@ -0,0 +1,133 @@ +/* + * Copyright (c) 2012-2018 The original author or authors + * ------------------------------------------------------ + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Apache License v2.0 which accompanies this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * The Apache License v2.0 is available at + * http://www.opensource.org/licenses/apache2.0.php + * + * You may elect to redistribute this code under either of these licenses. + */ +package io.moquette.broker.subscriptions; + +import static io.moquette.broker.subscriptions.CNode.SECURE_RANDOM; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; + +/** + * A wrapper over multiple maps of normal subscriptions. + */ +public class SubscriptionCollection implements Iterable { + + private final List> normalSubscriptions = new ArrayList<>(); + private final List>> sharedSubscriptions = new ArrayList<>(); + + public boolean isEmpty() { + return normalSubscriptions.isEmpty() && sharedSubscriptions.isEmpty(); + } + + /** + * Calculates the number of subscriptions. Expensive, only use for tests! + * @return the number of subscriptions. + */ + public int size() { + int total = 0; + for (Map var : normalSubscriptions) { + total += var.size(); + } + for (Map> var : sharedSubscriptions) { + total += var.size(); + } + return total; + } + + public void addNormalSubscriptions(Map subs) { + if (subs.isEmpty()) { + return; + } + normalSubscriptions.add(subs); + } + + public void addSharedSubscriptions(Map> subs) { + if (sharedSubscriptions.isEmpty()) { + return; + } + sharedSubscriptions.add(subs); + } + + private static Subscription selectRandom(List list) { + // select a subscription randomly + int randIdx = SECURE_RANDOM.nextInt(list.size()); + return list.get(randIdx).createSubscription(); + } + + @Override + public Iterator iterator() { + return new IteratorImpl(this); + } + + private static class IteratorImpl implements Iterator { + + private Iterator> normapSubListIter; + private Iterator normalSubIter; + + private Iterator>> sharedSubMapIter; + private Iterator> sharedSubIter; + + public IteratorImpl(SubscriptionCollection parent) { + normapSubListIter = parent.normalSubscriptions.iterator(); + sharedSubMapIter = parent.sharedSubscriptions.iterator(); + } + + @Override + public boolean hasNext() { + if (normalSubIter != null && normalSubIter.hasNext()) { + return true; + } + if (sharedSubIter != null && sharedSubIter.hasNext()) { + return true; + } + if (normapSubListIter != null) { + if (normapSubListIter.hasNext()) { + // Get the next normal subscriptions iterator. + Map next = normapSubListIter.next(); + normalSubIter = next.values().iterator(); + return true; + } else { + // Reached the end of the normal subscriptions lists. + normapSubListIter = null; + } + } + if (sharedSubMapIter != null) { + if (sharedSubMapIter.hasNext()) { + Map> next = sharedSubMapIter.next(); + sharedSubIter = next.values().iterator(); + return true; + } else { + sharedSubMapIter = null; + } + } + return false; + } + + @Override + public Subscription next() { + if (normalSubIter != null) { + return normalSubIter.next(); + } + if (sharedSubIter != null) { + return selectRandom(sharedSubIter.next()); + } + throw new NoSuchElementException("Fetched past the end of Iterator, make sure to call hasNext!"); + } + } + +} diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/SubscriptionCounterVisitor.java b/broker/src/main/java/io/moquette/broker/subscriptions/SubscriptionCounterVisitor.java index 863e944c3..9d06835d9 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/SubscriptionCounterVisitor.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/SubscriptionCounterVisitor.java @@ -19,11 +19,11 @@ class SubscriptionCounterVisitor implements CTrie.IVisitor { - private AtomicInteger accumulator = new AtomicInteger(0); + private final AtomicInteger accumulator = new AtomicInteger(0); @Override public void visit(CNode node, int deep) { - accumulator.addAndGet(node.subscriptions().size()); + accumulator.addAndGet(node.getSubscriptions().size()); } @Override diff --git a/broker/src/test/java/io/moquette/broker/PostOfficeInternalPublishTest.java b/broker/src/test/java/io/moquette/broker/PostOfficeInternalPublishTest.java index 5c285ad14..626e79b89 100644 --- a/broker/src/test/java/io/moquette/broker/PostOfficeInternalPublishTest.java +++ b/broker/src/test/java/io/moquette/broker/PostOfficeInternalPublishTest.java @@ -37,6 +37,7 @@ import static io.moquette.broker.MQTTConnectionPublishTest.memorySessionsRepository; import static io.moquette.BrokerConstants.NO_BUFFER_FLUSH; import static io.moquette.broker.PostOfficeUnsubscribeTest.CONFIG; +import io.moquette.broker.subscriptions.SubscriptionCollection; import static io.netty.handler.codec.mqtt.MqttQoS.*; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Collections.singleton; @@ -338,7 +339,7 @@ protected void subscribe(MQTTConnection connection, String topic, MqttQoS desire final String clientId = connection.getClientId(); Subscription expectedSubscription = new Subscription(clientId, new Topic(topic), MqttSubscriptionOption.onlyFromQos(desiredQos)); - final List matchedSubscriptions = subscriptions.matchWithoutQosSharpening(new Topic(topic)); + final SubscriptionCollection matchedSubscriptions = subscriptions.matchWithoutQosSharpening(new Topic(topic)); assertEquals(1, matchedSubscriptions.size()); final Subscription onlyMatchedSubscription = matchedSubscriptions.iterator().next(); assertEquals(expectedSubscription, onlyMatchedSubscription); diff --git a/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java b/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java index b3314fb6b..dfbf46dbf 100644 --- a/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java +++ b/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java @@ -40,6 +40,7 @@ import static io.moquette.broker.MQTTConnectionPublishTest.memorySessionsRepository; import static io.moquette.BrokerConstants.NO_BUFFER_FLUSH; import static io.moquette.broker.PostOfficeUnsubscribeTest.CONFIG; +import io.moquette.broker.subscriptions.SubscriptionCollection; import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE; import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE; import static io.netty.handler.codec.mqtt.MqttQoS.EXACTLY_ONCE; @@ -198,7 +199,7 @@ protected void subscribe(MQTTConnection connection, String topic, MqttQoS desire final String clientId = connection.getClientId(); Subscription expectedSubscription = new Subscription(clientId, new Topic(topic), MqttSubscriptionOption.onlyFromQos(desiredQos)); - final List matchedSubscriptions = subscriptions.matchWithoutQosSharpening(new Topic(topic)); + final SubscriptionCollection matchedSubscriptions = subscriptions.matchWithoutQosSharpening(new Topic(topic)); assertEquals(1, matchedSubscriptions.size()); final Subscription onlyMatchedSubscription = matchedSubscriptions.iterator().next(); assertEquals(expectedSubscription, onlyMatchedSubscription); diff --git a/broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java b/broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java index f0b25dcff..8443b7bc8 100644 --- a/broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java +++ b/broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java @@ -43,6 +43,7 @@ import static io.moquette.BrokerConstants.NO_BUFFER_FLUSH; import static io.moquette.broker.PostOfficePublishTest.ALLOW_ANONYMOUS_AND_ZERO_BYTES_CLID; import static io.moquette.broker.PostOfficePublishTest.SUBSCRIBER_ID; +import io.moquette.broker.subscriptions.SubscriptionCollection; import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE; import static io.netty.handler.codec.mqtt.MqttQoS.EXACTLY_ONCE; import static java.util.Collections.singleton; @@ -146,7 +147,7 @@ protected void subscribe(EmbeddedChannel channel, String topic, MqttQoS desiredQ final String clientId = NettyUtils.clientID(channel); Subscription expectedSubscription = new Subscription(clientId, new Topic(topic), MqttSubscriptionOption.onlyFromQos(desiredQos)); - final List matchedSubscriptions = subscriptions.matchWithoutQosSharpening(new Topic(topic)); + final SubscriptionCollection matchedSubscriptions = subscriptions.matchWithoutQosSharpening(new Topic(topic)); assertEquals(1, matchedSubscriptions.size()); final Subscription onlyMatchedSubscription = matchedSubscriptions.iterator().next(); assertEquals(expectedSubscription, onlyMatchedSubscription); @@ -166,7 +167,7 @@ protected void subscribe(MQTTConnection connection, String topic, MqttQoS desire final String clientId = connection.getClientId(); Subscription expectedSubscription = new Subscription(clientId, new Topic(topic), MqttSubscriptionOption.onlyFromQos(desiredQos)); - final List matchedSubscriptions = subscriptions.matchWithoutQosSharpening(new Topic(topic)); + final SubscriptionCollection matchedSubscriptions = subscriptions.matchWithoutQosSharpening(new Topic(topic)); assertEquals(1, matchedSubscriptions.size()); final Subscription onlyMatchedSubscription = matchedSubscriptions.iterator().next(); assertEquals(expectedSubscription, onlyMatchedSubscription); diff --git a/broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java b/broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java index 13bda420f..279e62995 100644 --- a/broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java +++ b/broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java @@ -39,6 +39,7 @@ import static io.moquette.broker.MQTTConnectionPublishTest.memorySessionsRepository; import static io.moquette.BrokerConstants.NO_BUFFER_FLUSH; import static io.moquette.broker.PostOfficePublishTest.PUBLISHER_ID; +import io.moquette.broker.subscriptions.SubscriptionCollection; import static io.netty.handler.codec.mqtt.MqttQoS.*; import static java.util.Collections.*; import java.util.List; @@ -125,7 +126,7 @@ protected void subscribe(MQTTConnection connection, String topic, MqttQoS desire final String clientId = connection.getClientId(); Subscription expectedSubscription = new Subscription(clientId, new Topic(topic), MqttSubscriptionOption.onlyFromQos(desiredQos)); - final List matchedSubscriptions = subscriptions.matchQosSharpening(new Topic(topic)); + final SubscriptionCollection matchedSubscriptions = subscriptions.matchWithoutQosSharpening(new Topic(topic)); assertEquals(1, matchedSubscriptions.size()); //assertTrue(matchedSubscriptions.size() >=1); final Subscription onlyMatchedSubscription = matchedSubscriptions.iterator().next(); diff --git a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSharedSubscriptionDirectoryMatchingTest.java b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSharedSubscriptionDirectoryMatchingTest.java index cb83fcb6f..bc72414fb 100644 --- a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSharedSubscriptionDirectoryMatchingTest.java +++ b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSharedSubscriptionDirectoryMatchingTest.java @@ -54,7 +54,7 @@ public void whenManySharedSubscriptionsOfDifferentShareNameMatchATopicThenOneSub sut.addShared("TempSensor1", new ShareName("temp_sensors"), asTopic("/livingroom"), asOption(MqttQoS.AT_MOST_ONCE)); sut.addShared("TempSensor1", new ShareName("livingroom_devices"), asTopic("/livingroom"), asOption(MqttQoS.AT_MOST_ONCE)); - List matchingSubscriptions = sut.matchWithoutQosSharpening(asTopic("/livingroom")); + SubscriptionCollection matchingSubscriptions = sut.matchWithoutQosSharpening(asTopic("/livingroom")); assertThat(matchingSubscriptions) .containsOnly(SubscriptionTestUtils.asSubscription("TempSensor1", "/livingroom", "temp_sensors"), SubscriptionTestUtils.asSubscription("TempSensor1", "/livingroom", "livingroom_devices")) @@ -71,7 +71,7 @@ public void givenSessionHasMultipleSharedSubscriptionWhenTheClientIsRemovedThenN sut.removeSharedSubscriptionsForClient(clientId); // Verify - List matchingSubscriptions = sut.matchWithoutQosSharpening(asTopic("/livingroom")); + SubscriptionCollection matchingSubscriptions = sut.matchWithoutQosSharpening(asTopic("/livingroom")); assertThat(matchingSubscriptions).isEmpty(); } @@ -82,7 +82,7 @@ public void givenSubscriptionWithSubscriptionIdWhenNewSubscriptionIsProcessedThe new SubscriptionIdentifier(1)); // verify it contains the subscription identifier - final List matchingSubscriptions = sut.matchQosSharpening(asTopic("client/test/b")); + final SubscriptionCollection matchingSubscriptions = sut.matchWithoutQosSharpening(asTopic("client/test/b")); verifySubscriptionIdentifierIsPresent(matchingSubscriptions, new SubscriptionIdentifier(1), "share_temp"); // update the subscription of same clientId on same topic filter but with different subscription identifier @@ -90,11 +90,11 @@ public void givenSubscriptionWithSubscriptionIdWhenNewSubscriptionIsProcessedThe new SubscriptionIdentifier(123)); // verify the subscription identifier is updated - final List reloadedSubscriptions = sut.matchQosSharpening(asTopic("client/test/b")); + final SubscriptionCollection reloadedSubscriptions = sut.matchWithoutQosSharpening(asTopic("client/test/b")); verifySubscriptionIdentifierIsPresent(reloadedSubscriptions, new SubscriptionIdentifier(123), "share_temp"); } - private static void verifySubscriptionIdentifierIsPresent(List matchingSubscriptions, SubscriptionIdentifier subscriptionIdentifier, String expectedShareName) { + private static void verifySubscriptionIdentifierIsPresent(SubscriptionCollection matchingSubscriptions, SubscriptionIdentifier subscriptionIdentifier, String expectedShareName) { assertAll("subscription contains the subscription identifier", () -> assertEquals(1, matchingSubscriptions.size()), () -> assertEquals(expectedShareName, matchingSubscriptions.iterator().next().shareName), @@ -111,13 +111,13 @@ public void givenSubscriptionWithSubscriptionIdWhenNewSubscriptionWithoutSubscri // verify it contains the subscription identifier SubscriptionIdentifier expectedSubscriptionId = new SubscriptionIdentifier(1); - verifySubscriptionIdentifierIsPresent(sut.matchQosSharpening(asTopic("client/test/b")), expectedSubscriptionId, "share_temp"); + verifySubscriptionIdentifierIsPresent(sut.matchWithoutQosSharpening(asTopic("client/test/b")), expectedSubscriptionId, "share_temp"); // update the subscription of same clientId on same topic filter but removing subscription identifier sut.addShared("client", new ShareName("share_temp"), asTopic("client/test/b"), asOption(MqttQoS.AT_MOST_ONCE)); // verify the subscription identifier is removed - final List reloadedSubscriptions = sut.matchQosSharpening(asTopic("client/test/b")); + final SubscriptionCollection reloadedSubscriptions = sut.matchWithoutQosSharpening(asTopic("client/test/b")); assertAll("subscription doesn't contain subscription identifier", () -> assertEquals(1, reloadedSubscriptions.size()), () -> assertFalse(reloadedSubscriptions.iterator().next().hasSubscriptionIdentifier()) diff --git a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSpeedTest.java b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSpeedTest.java index 486a6607e..7f732cd53 100644 --- a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSpeedTest.java +++ b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSpeedTest.java @@ -79,7 +79,7 @@ private static void clearResults() { } private static void logResults(Map>> set) { - StringBuilder header = new StringBuilder(); + StringBuilder header = new StringBuilder(set.values().iterator().next().values().iterator().next().get(0).topicCount+" topics"); TreeMap rowPerLength = new TreeMap<>(); for (Map.Entry>> entry : set.entrySet()) { Integer threads = entry.getKey(); @@ -278,8 +278,14 @@ public void readSubscriptions(CTrie tree, List subsToRead, final Subscription subscription = subReq.subscription(); final Topic topic = subReq.getTopicFilter(); - final List recursiveMatch = tree.recursiveMatch(topic); - final boolean contains = recursiveMatch.contains(subscription); + final SubscriptionCollection recursiveMatch = tree.recursiveMatch(topic); + boolean contains = false; + for (Subscription sub : recursiveMatch) { + if (sub.equals(subscription)) { + contains = true; + break; + } + } Assertions.assertTrue(contains, () -> "Failed to find " + subscription + " on " + topic + " found " + recursiveMatch.size() + " matches."); count++; diff --git a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectoryMatchingTest.java b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectoryMatchingTest.java index 7d1438732..b6f31d88a 100644 --- a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectoryMatchingTest.java +++ b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectoryMatchingTest.java @@ -15,7 +15,6 @@ */ package io.moquette.broker.subscriptions; - import io.moquette.broker.ISubscriptionsRepository; import io.moquette.persistence.MemorySubscriptionsRepository; import io.netty.handler.codec.mqtt.MqttQoS; @@ -27,6 +26,7 @@ import static io.moquette.broker.subscriptions.CTrieSharedSubscriptionDirectoryMatchingTest.asOption; import static io.moquette.broker.subscriptions.Topic.asTopic; +import java.util.ArrayList; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.*; @@ -198,7 +198,7 @@ public void testOverlappingSubscriptions() { sut.add(specificSub.clientId, specificSub.topicFilter, specificSub.option()); //Exercise - final List matchingForSpecific = sut.matchQosSharpening(asTopic("a/b")); + final SubscriptionCollection matchingForSpecific = sut.matchWithoutQosSharpening(asTopic("a/b")); // Verify assertThat(matchingForSpecific.size()).isEqualTo(1); @@ -226,7 +226,7 @@ public void removeSubscription_sameClients_subscribedSameTopic() { sut.removeSubscription(asTopic("/topic"), "Sensor1"); // Verify - final List matchingSubscriptions = sut.matchWithoutQosSharpening(asTopic("/topic")); + final SubscriptionCollection matchingSubscriptions = sut.matchWithoutQosSharpening(asTopic("/topic")); assertThat(matchingSubscriptions).isEmpty(); } @@ -244,14 +244,17 @@ public void duplicatedSubscriptionsWithDifferentQos() { this.sut.add("client1", asTopic("client/test/b"), asOption(MqttQoS.EXACTLY_ONCE)); // Verify - List subscriptions = this.sut.matchQosSharpening(asTopic("client/test/b")); + SubscriptionCollection subscriptions = this.sut.matchWithoutQosSharpening(asTopic("client/test/b")); assertThat(subscriptions).contains(client1SubQoS2); assertThat(subscriptions).contains(client2Sub); - final Optional matchingClient1Sub = subscriptions - .stream() - .filter(s -> s.equals(client1SubQoS0)) - .findFirst(); + Optional matchingClient1Sub = Optional.empty(); + for (Subscription sub : subscriptions) { + if (sub.equals(client1SubQoS0)) { + matchingClient1Sub = Optional.of(sub); + break; + } + } assertTrue(matchingClient1Sub.isPresent()); Subscription client1Sub = matchingClient1Sub.get(); @@ -267,18 +270,18 @@ public void givenSubscriptionWithSubscriptionIdWhenNewSubscriptionIsProcessedThe sut.add("client", asTopic("client/test/b"), asOption(MqttQoS.AT_MOST_ONCE), new SubscriptionIdentifier(1)); // verify it contains the subscription identifier - final List matchingSubscriptions = sut.matchQosSharpening(asTopic("client/test/b")); + final SubscriptionCollection matchingSubscriptions = sut.matchWithoutQosSharpening(asTopic("client/test/b")); verifySubscriptionIdentifierIsPresent(matchingSubscriptions, new SubscriptionIdentifier(1)); // update the subscription of same clientId on same topic filter but with different subscription identifier sut.add("client", asTopic("client/test/b"), asOption(MqttQoS.AT_MOST_ONCE), new SubscriptionIdentifier(123)); // verify the subscription identifier is updated - final List reloadedSubscriptions = sut.matchQosSharpening(asTopic("client/test/b")); + final SubscriptionCollection reloadedSubscriptions = sut.matchWithoutQosSharpening(asTopic("client/test/b")); verifySubscriptionIdentifierIsPresent(reloadedSubscriptions, new SubscriptionIdentifier(123)); } - private static void verifySubscriptionIdentifierIsPresent(List matchingSubscriptions, SubscriptionIdentifier subscriptionIdentifier) { + private static void verifySubscriptionIdentifierIsPresent(SubscriptionCollection matchingSubscriptions, SubscriptionIdentifier subscriptionIdentifier) { assertAll("subscription contains the subscription identifier", () -> assertEquals(1, matchingSubscriptions.size()), () -> assertTrue(matchingSubscriptions.iterator().next().hasSubscriptionIdentifier()), @@ -293,13 +296,13 @@ public void givenSubscriptionWithSubscriptionIdWhenNewSubscriptionWithoutSubscri // verify it contains the subscription identifier SubscriptionIdentifier expectedSubscriptionId = new SubscriptionIdentifier(1); - verifySubscriptionIdentifierIsPresent(sut.matchQosSharpening(asTopic("client/test/b")), expectedSubscriptionId); + verifySubscriptionIdentifierIsPresent(sut.matchWithoutQosSharpening(asTopic("client/test/b")), expectedSubscriptionId); // update the subscription of same clientId on same topic filter but removing subscription identifier sut.add("client", asTopic("client/test/b"), asOption(MqttQoS.AT_MOST_ONCE)); // verify the subscription identifier is removed - final List reloadedSubscriptions = sut.matchQosSharpening(asTopic("client/test/b")); + final SubscriptionCollection reloadedSubscriptions = sut.matchWithoutQosSharpening(asTopic("client/test/b")); assertAll("subscription doesn't contain subscription identifier", () -> assertEquals(1, reloadedSubscriptions.size()), () -> assertFalse(reloadedSubscriptions.iterator().next().hasSubscriptionIdentifier()) diff --git a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieTest.java b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieTest.java index 7a7221abe..fd0c42e12 100644 --- a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieTest.java +++ b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieTest.java @@ -15,7 +15,6 @@ */ package io.moquette.broker.subscriptions; - import io.moquette.broker.subscriptions.CTrie.SubscriptionRequest; import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttSubscriptionOption; @@ -26,13 +25,11 @@ import static io.moquette.broker.subscriptions.SubscriptionTestUtils.asSubscription; import static io.moquette.broker.subscriptions.Topic.asTopic; -import java.util.Collection; -import java.util.List; -import java.util.Set; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import org.pcollections.PMap; public class CTrieTest { @@ -53,15 +50,15 @@ public void testAddOnSecondLayerWithEmptyTokenOnEmptyTree() { final Optional matchedNode = sut.lookup(asTopic("/")); assertTrue(matchedNode.isPresent(), "Node on path / must be present"); //verify structure, only root INode and the first CNode should be present - assertThat(this.sut.root.mainNode().subscriptions()).isEmpty(); + assertThat(this.sut.root.mainNode().getSubscriptions()).isEmpty(); assertThat(this.sut.root.mainNode().allChildren()).isNotEmpty(); INode firstLayer = this.sut.root.mainNode().allChildren().stream().findFirst().get(); - assertThat(firstLayer.mainNode().subscriptions()).isEmpty(); + assertThat(firstLayer.mainNode().getSubscriptions()).isEmpty(); assertThat(firstLayer.mainNode().allChildren()).isNotEmpty(); INode secondLayer = firstLayer.mainNode().allChildren().stream().findFirst().get(); - assertThat(secondLayer.mainNode().subscriptions()).isNotEmpty(); + assertThat(secondLayer.mainNode().getSubscriptions()).isNotEmpty(); assertThat(secondLayer.mainNode().allChildren()).isEmpty(); } @@ -74,7 +71,7 @@ public void testAddFirstLayerNodeOnEmptyTree() { //Verify final Optional matchedNode = sut.lookup(asTopic("/temp")); assertTrue(matchedNode.isPresent(), "Node on path /temp must be present"); - assertFalse(matchedNode.get().subscriptions().isEmpty()); + assertFalse(matchedNode.get().getSubscriptions().isEmpty()); } @Test @@ -101,8 +98,8 @@ public void testAddNewSubscriptionOnExistingNode() { //Verify final Optional matchedNode = sut.lookup(asTopic("/temp")); assertTrue(matchedNode.isPresent(), "Node on path /temp must be present"); - final Collection subscriptions = matchedNode.get().subscriptions(); - assertTrue(subscriptions.contains(asSubscription("TempSensor2", "/temp"))); + final PMap subscriptions = matchedNode.get().getSubscriptions(); + assertTrue(subscriptions.containsValue(asSubscription("TempSensor2", "/temp"))); } @Test @@ -119,8 +116,8 @@ public void testAddNewDeepNodes() { //Verify final Optional matchedNode = sut.lookup(asTopic("/italy/happiness")); assertTrue(matchedNode.isPresent(), "Node on path /italy/happiness must be present"); - final Collection subscriptions = matchedNode.get().subscriptions(); - assertTrue(subscriptions.contains(asSubscription("HappinessSensor", "/italy/happiness"))); + final PMap subscriptions = matchedNode.get().getSubscriptions(); + assertTrue(subscriptions.containsValue(asSubscription("HappinessSensor", "/italy/happiness"))); } static SubscriptionRequest clientSubOnTopic(String clientID, String topicFilter) { @@ -193,7 +190,7 @@ public void givenTreeWithSomeNodeHierarchyWhenRemoveContainedSubscriptionThenNod sut.removeFromTree(CTrie.UnsubscribeRequest.buildNonShared("TempSensor1", asTopic("/temp/1"))); sut.removeFromTree(CTrie.UnsubscribeRequest.buildNonShared("TempSensor1", asTopic("/temp/1"))); - final List matchingSubs = sut.recursiveMatch(asTopic("/temp/2")); + final SubscriptionCollection matchingSubs = sut.recursiveMatch(asTopic("/temp/2")); //Verify final Subscription expectedMatchingsub = new Subscription("TempSensor1", asTopic("/temp/2"), MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_MOST_ONCE)); @@ -210,8 +207,8 @@ public void givenTreeWithSomeNodeHierarchWhenRemoveContainedSubscriptionSmallerT //Exercise sut.removeFromTree(CTrie.UnsubscribeRequest.buildNonShared("TempSensor1", asTopic("/temp"))); - final List matchingSubs1 = sut.recursiveMatch(asTopic("/temp/1")); - final List matchingSubs2 = sut.recursiveMatch(asTopic("/temp/2")); + final SubscriptionCollection matchingSubs1 = sut.recursiveMatch(asTopic("/temp/1")); + final SubscriptionCollection matchingSubs2 = sut.recursiveMatch(asTopic("/temp/2")); //Verify // not clear to me, but I believe /temp unsubscribe should not unsub you from downstream /temp/1 or /temp/2 @@ -239,7 +236,7 @@ public void testMatchSubscriptionNoWildcards() { sut.addToTree(newSubscription); //Exercise - final List matchingSubs = sut.recursiveMatch(asTopic("/temp")); + final SubscriptionCollection matchingSubs = sut.recursiveMatch(asTopic("/temp")); //Verify final Subscription expectedMatchingsub = new Subscription("TempSensor1", asTopic("/temp"), MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_MOST_ONCE)); @@ -254,8 +251,8 @@ public void testRemovalInnerTopicOffRootSameClient() { sut.addToTree(newSubscription); //Exercise - final List matchingSubs1 = sut.recursiveMatch(asTopic("temp")); - final List matchingSubs2 = sut.recursiveMatch(asTopic("temp/1")); + final SubscriptionCollection matchingSubs1 = sut.recursiveMatch(asTopic("temp")); + final SubscriptionCollection matchingSubs2 = sut.recursiveMatch(asTopic("temp/1")); //Verify final Subscription expectedMatchingsub1 = new Subscription("TempSensor1", asTopic("temp"), MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_MOST_ONCE)); @@ -267,8 +264,8 @@ public void testRemovalInnerTopicOffRootSameClient() { sut.removeFromTree(CTrie.UnsubscribeRequest.buildNonShared("TempSensor1", asTopic("temp"))); //Exercise - final List matchingSubs3 = sut.recursiveMatch(asTopic("temp")); - final List matchingSubs4 = sut.recursiveMatch(asTopic("temp/1")); + final SubscriptionCollection matchingSubs3 = sut.recursiveMatch(asTopic("temp")); + final SubscriptionCollection matchingSubs4 = sut.recursiveMatch(asTopic("temp/1")); assertThat(matchingSubs3).doesNotContain(expectedMatchingsub1); assertThat(matchingSubs4).contains(expectedMatchingsub2); @@ -282,8 +279,8 @@ public void testRemovalInnerTopicOffRootDiffClient() { sut.addToTree(newSubscription); //Exercise - final List matchingSubs1 = sut.recursiveMatch(asTopic("temp")); - final List matchingSubs2 = sut.recursiveMatch(asTopic("temp/1")); + final SubscriptionCollection matchingSubs1 = sut.recursiveMatch(asTopic("temp")); + final SubscriptionCollection matchingSubs2 = sut.recursiveMatch(asTopic("temp/1")); //Verify final Subscription expectedMatchingsub1 = new Subscription("TempSensor1", asTopic("temp"), MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_MOST_ONCE)); @@ -295,8 +292,8 @@ public void testRemovalInnerTopicOffRootDiffClient() { sut.removeFromTree(CTrie.UnsubscribeRequest.buildNonShared("TempSensor1", asTopic("temp"))); //Exercise - final List matchingSubs3 = sut.recursiveMatch(asTopic("temp")); - final List matchingSubs4 = sut.recursiveMatch(asTopic("temp/1")); + final SubscriptionCollection matchingSubs3 = sut.recursiveMatch(asTopic("temp")); + final SubscriptionCollection matchingSubs4 = sut.recursiveMatch(asTopic("temp/1")); assertThat(matchingSubs3).doesNotContain(expectedMatchingsub1); assertThat(matchingSubs4).contains(expectedMatchingsub2); @@ -310,8 +307,8 @@ public void testRemovalOuterTopicOffRootDiffClient() { sut.addToTree(newSubscription); //Exercise - final List matchingSubs1 = sut.recursiveMatch(asTopic("temp")); - final List matchingSubs2 = sut.recursiveMatch(asTopic("temp/1")); + final SubscriptionCollection matchingSubs1 = sut.recursiveMatch(asTopic("temp")); + final SubscriptionCollection matchingSubs2 = sut.recursiveMatch(asTopic("temp/1")); //Verify final Subscription expectedMatchingsub1 = new Subscription("TempSensor1", asTopic("temp"), MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_MOST_ONCE)); @@ -323,8 +320,8 @@ public void testRemovalOuterTopicOffRootDiffClient() { sut.removeFromTree(CTrie.UnsubscribeRequest.buildNonShared("TempSensor2", asTopic("temp/1"))); //Exercise - final List matchingSubs3 = sut.recursiveMatch(asTopic("temp")); - final List matchingSubs4 = sut.recursiveMatch(asTopic("temp/1")); + final SubscriptionCollection matchingSubs3 = sut.recursiveMatch(asTopic("temp")); + final SubscriptionCollection matchingSubs4 = sut.recursiveMatch(asTopic("temp/1")); assertThat(matchingSubs3).contains(expectedMatchingsub1); assertThat(matchingSubs4).doesNotContain(expectedMatchingsub2); diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/RequestResponseTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/RequestResponseTest.java index 1eafab0ad..12ea43c8d 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/RequestResponseTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/RequestResponseTest.java @@ -35,9 +35,13 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class RequestResponseTest extends AbstractServerIntegrationWithoutClientFixture { + private static final Logger LOGGER = LoggerFactory.getLogger(RequestResponseTest.class.getName()); + @Test public void givenRequestResponseProtocolWhenRequestIsIssueThenTheResponderReply() throws InterruptedException { final Mqtt5BlockingClient requester = createHiveBlockingClient("requester"); @@ -60,9 +64,11 @@ private static void responderRepliesToRequesterPublish(Mqtt5BlockingClient respo .topicFilter("requester/door/open") .qos(MqttQos.AT_LEAST_ONCE) .build(); + LOGGER.info("Subscribing to on requester/door/open"); responder.toAsync().subscribe(subscribeToRequest, (Mqtt5Publish pub) -> { assertTrue(pub.getResponseTopic().isPresent(), "Response topic MUST defined in request publish"); + LOGGER.info("Responding on {}", pub.getResponseTopic().get()); Mqtt5PublishResult responseResult = responder.publishWith() .topic(pub.getResponseTopic().get()) .payload("OK".getBytes(StandardCharsets.UTF_8))