Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix 841 Part-2: C-Tree deepening #849

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
Version 0.18-SNAPSHOT:
[fix] Incorrect reference used in compareAndSet in CTrie.cleanTomb. (#841)
[feature] Implement response-information property for request-response flow. (#840)
[fix] Optimised page file opening for disk-based queues. (#837)
[feature] Manage payload format indicator property, when set verify payload format. (#826)
Expand Down
6 changes: 6 additions & 0 deletions broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,12 @@
<version>1.15</version>
</dependency>

<dependency>
<groupId>org.pcollections</groupId>
<artifactId>pcollections</artifactId>
<version>4.0.2</version>
</dependency>

<dependency>
<groupId>org.fusesource.mqtt-client</groupId>
<artifactId>mqtt-client</artifactId>
Expand Down
3 changes: 2 additions & 1 deletion broker/src/main/java/io/moquette/broker/PostOffice.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.stream.Collectors;

import static io.moquette.broker.Utils.messageId;
import io.moquette.broker.subscriptions.SubscriptionCollection;
import static io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader.from;
import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;
import static io.netty.handler.codec.mqtt.MqttQoS.EXACTLY_ONCE;
Expand Down Expand Up @@ -842,7 +843,7 @@ private RoutingResults publish2Subscribers(String publisherClientId,
final boolean retainPublish = msg.fixedHeader().isRetain();
final Topic topic = new Topic(msg.variableHeader().topicName());
final MqttQoS publishingQos = msg.fixedHeader().qosLevel();
List<Subscription> topicMatchingSubscriptions = subscriptions.matchQosSharpening(topic);
SubscriptionCollection topicMatchingSubscriptions = subscriptions.matchWithoutQosSharpening(topic);
if (topicMatchingSubscriptions.isEmpty()) {
// no matching subscriptions, clean exit
LOG.trace("No matching subscriptions for topic: {}", topic);
Expand Down
121 changes: 42 additions & 79 deletions broker/src/main/java/io/moquette/broker/subscriptions/CNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
Expand All @@ -33,52 +34,48 @@
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
import org.pcollections.PMap;
import org.pcollections.TreePMap;

class CNode implements Comparable<CNode> {

public static final Random SECURE_RANDOM = new SecureRandom();
private final Token token;
private final List<INode> children;
// Sorted list of subscriptions. The sort is necessary for fast access, instead of linear scan.
private List<Subscription> subscriptions;
private PMap<String, INode> children;
// Map of subscriptions per clientId.
private PMap<String, Subscription> subscriptions;
// the list of SharedSubscription is sorted. The sort is necessary for fast access, instead of linear scan.
private Map<ShareName, List<SharedSubscription>> sharedSubscriptions;
private PMap<ShareName, List<SharedSubscription>> sharedSubscriptions;

CNode(Token token) {
this.children = new ArrayList<>();
this.subscriptions = new ArrayList<>();
this.sharedSubscriptions = new HashMap<>();
this.children = TreePMap.empty();
this.subscriptions = TreePMap.empty();
this.sharedSubscriptions = TreePMap.empty();
this.token = token;
}

//Copy constructor
private CNode(Token token, List<INode> children, List<Subscription> subscriptions, Map<ShareName,
List<SharedSubscription>> sharedSubscriptions) {
private CNode(Token token, PMap<String, INode> children, PMap<String, Subscription> subscriptions, PMap<ShareName, List<SharedSubscription>> sharedSubscriptions) {
this.token = token; // keep reference, root comparison in directory logic relies on it for now.
this.subscriptions = new ArrayList<>(subscriptions);
this.sharedSubscriptions = new HashMap<>(sharedSubscriptions);
this.children = new ArrayList<>(children);
this.subscriptions = subscriptions;
this.sharedSubscriptions = sharedSubscriptions;
this.children = children;
}

public Token getToken() {
return token;
}

List<INode> allChildren() {
return new ArrayList<>(this.children);
Collection<INode> allChildren() {
return this.children.values();
}

Optional<INode> childOf(Token token) {
int idx = findIndexForToken(token);
if (idx < 0) {
INode value = children.get(token.name);
if (value == null) {
return Optional.empty();
}
return Optional.of(children.get(idx));
}

private int findIndexForToken(Token token) {
final INode tempTokenNode = new INode(new CNode(token));
return Collections.binarySearch(children, tempTokenNode, (INode node, INode tokenHolder) -> node.mainNode().token.compareTo(tokenHolder.mainNode().token));
return Optional.of(value);
}

@Override
Expand All @@ -91,35 +88,23 @@ CNode copy() {
}

public void add(INode newINode) {
int idx = findIndexForToken(newINode.mainNode().token);
if (idx < 0) {
children.add(-1 - idx, newINode);
} else {
children.add(idx, newINode);
}
final String tokenName = newINode.mainNode().token.name;
children = children.plus(tokenName, newINode);
}

public void remove(INode node) {
int idx = findIndexForToken(node.mainNode().token);
this.children.remove(idx);
public INode remove(INode node) {
final String tokenName = node.mainNode().token.name;
INode toRemove = children.get(tokenName);
children = children.minus(tokenName);
return toRemove;
}

private List<Subscription> sharedSubscriptions() {
List<Subscription> selectedSubscriptions = new ArrayList<>(sharedSubscriptions.size());
// for each sharedSubscription related to a ShareName, select one subscription
for (Map.Entry<ShareName, List<SharedSubscription>> subsForName : sharedSubscriptions.entrySet()) {
List<SharedSubscription> list = subsForName.getValue();
final String shareName = subsForName.getKey().getShareName();
// select a subscription randomly
int randIdx = SECURE_RANDOM.nextInt(list.size());
SharedSubscription sub = list.get(randIdx);
selectedSubscriptions.add(sub.createSubscription());
}
return selectedSubscriptions;
public PMap<String, Subscription> getSubscriptions() {
return subscriptions;
}

List<Subscription> subscriptions() {
return subscriptions;
public PMap<ShareName, List<SharedSubscription>> getSharedSubscriptions() {
return sharedSubscriptions;
}

// Mutating operation
Expand All @@ -141,25 +126,23 @@ CNode addSubscription(SubscriptionRequest request) {
final Subscription newSubscription = request.subscription();

// if already contains one with same topic and same client, keep that with higher QoS
int idx = Collections.binarySearch(subscriptions, newSubscription);
if (idx >= 0) {
final Subscription existing = subscriptions.get(newSubscription.clientId);
if (existing != null) {
// Subscription already exists
final Subscription existing = subscriptions.get(idx);
if (needsToUpdateExistingSubscription(newSubscription, existing)) {
subscriptions.set(idx, newSubscription);
subscriptions = subscriptions.plus(newSubscription.clientId, newSubscription);
}
} else {
// insert into the expected index so that the sorting is maintained
this.subscriptions.add(-1 - idx, newSubscription);
subscriptions = subscriptions.plus(newSubscription.clientId, newSubscription);
}
}
return this;
}

private static boolean needsToUpdateExistingSubscription(Subscription newSubscription, Subscription existing) {
if ((newSubscription.hasSubscriptionIdentifier() && existing.hasSubscriptionIdentifier()) &&
newSubscription.getSubscriptionIdentifier().equals(existing.getSubscriptionIdentifier())
) {
if ((newSubscription.hasSubscriptionIdentifier() && existing.hasSubscriptionIdentifier())
&& newSubscription.getSubscriptionIdentifier().equals(existing.getSubscriptionIdentifier())) {
// if subscription identifier hasn't changed,
// then check QoS but don't lower the requested QoS level
return existing.option().qos().value() < newSubscription.option().qos().value();
Expand All @@ -177,8 +160,8 @@ private static boolean needsToUpdateExistingSubscription(Subscription newSubscri
* AND at least one subscription is actually present for that clientId
* */
boolean containsOnly(String clientId) {
for (Subscription sub : this.subscriptions) {
if (!sub.clientId.equals(clientId)) {
for (String sub : this.subscriptions.keySet()) {
if (!sub.equals(clientId)) {
return false;
}
}
Expand Down Expand Up @@ -207,12 +190,7 @@ private static SharedSubscription wrapKey(String clientId) {

//TODO this is equivalent to negate(containsOnly(clientId))
private boolean containsSubscriptionsForClient(String clientId) {
for (Subscription sub : this.subscriptions) {
if (sub.clientId.equals(clientId)) {
return true;
}
}
return false;
return subscriptions.containsKey(clientId);
}

void removeSubscriptionsFor(UnsubscribeRequest request) {
Expand All @@ -226,20 +204,12 @@ void removeSubscriptionsFor(UnsubscribeRequest request) {
subscriptionsForName.removeAll(toRemove);

if (subscriptionsForName.isEmpty()) {
this.sharedSubscriptions.remove(request.getSharedName());
sharedSubscriptions = sharedSubscriptions.minus(request.getSharedName());
} else {
this.sharedSubscriptions.replace(request.getSharedName(), subscriptionsForName);
sharedSubscriptions = sharedSubscriptions.plus(request.getSharedName(), subscriptionsForName);
}
} else {
// collect Subscription instances to remove
Set<Subscription> toRemove = new HashSet<>();
for (Subscription sub : this.subscriptions) {
if (sub.clientId.equals(clientId)) {
toRemove.add(sub);
}
}
// effectively remove the instances
this.subscriptions.removeAll(toRemove);
subscriptions = subscriptions.minus(clientId);
}
}

Expand All @@ -248,11 +218,4 @@ public int compareTo(CNode o) {
return token.compareTo(o.token);
}

public List<Subscription> sharedAndNonSharedSubscriptions() {
List<Subscription> shared = sharedSubscriptions();
List<Subscription> returnedSubscriptions = new ArrayList<>(subscriptions.size() + shared.size());
returnedSubscriptions.addAll(subscriptions);
returnedSubscriptions.addAll(shared);
return returnedSubscriptions;
}
}
58 changes: 38 additions & 20 deletions broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ public SubscriptionIdentifier getSubscriptionIdentifier() {
* Models a request to unsubscribe a client, it's carrier for the Subscription
* */
public final static class UnsubscribeRequest {

private final Topic topicFilter;
private final String clientId;
private boolean shared = false;
Expand Down Expand Up @@ -231,44 +232,56 @@ private NavigationAction evaluate(Topic topicName, CNode cnode, int depth) {
return NavigationAction.STOP;
}

public List<Subscription> recursiveMatch(Topic topicName) {
return recursiveMatch(topicName, this.root, 0);
public SubscriptionCollection recursiveMatch(Topic topicName) {
SubscriptionCollection subscriptions = new SubscriptionCollection();
recursiveMatch(topicName, this.root, 0, subscriptions);
return subscriptions;
}

private List<Subscription> recursiveMatch(Topic topicName, INode inode, int depth) {
private void recursiveMatch(Topic topicName, INode inode, int depth, SubscriptionCollection target) {
CNode cnode = inode.mainNode();
if (cnode instanceof TNode) {
return Collections.emptyList();
return;
}
NavigationAction action = evaluate(topicName, cnode, depth);
if (action == NavigationAction.MATCH) {
return cnode.sharedAndNonSharedSubscriptions();
target.addNormalSubscriptions(cnode.getSubscriptions());
target.addSharedSubscriptions(cnode.getSharedSubscriptions());
return;
}
if (action == NavigationAction.STOP) {
return Collections.emptyList();
return;
}
Topic remainingTopic = (ROOT.equals(cnode.getToken())) ? topicName : topicName.exceptHeadToken();
List<Subscription> subscriptions = new ArrayList<>();
final boolean isRoot = ROOT.equals(cnode.getToken());
final boolean isSingle = Token.SINGLE.equals(cnode.getToken());
final boolean isMulti = Token.MULTI.equals(cnode.getToken());

Topic remainingTopic = isRoot
? topicName
: (isSingle || isMulti)
? topicName.exceptFullHeadToken()
: topicName.exceptHeadToken();
SubscriptionCollection subscriptions = new SubscriptionCollection();

// We should only consider the maximum three children children of
// type #, + or exact match
Optional<INode> subInode = cnode.childOf(Token.MULTI);
if (subInode.isPresent()) {
subscriptions.addAll(recursiveMatch(remainingTopic, subInode.get(), depth + 1));
recursiveMatch(remainingTopic, subInode.get(), depth + 1, target);
}
subInode = cnode.childOf(Token.SINGLE);
if (subInode.isPresent()) {
subscriptions.addAll(recursiveMatch(remainingTopic, subInode.get(), depth + 1));
recursiveMatch(remainingTopic, subInode.get(), depth + 1, target);
}
if (remainingTopic.isEmpty()) {
subscriptions.addAll(cnode.sharedAndNonSharedSubscriptions());
target.addNormalSubscriptions(cnode.getSubscriptions());
target.addSharedSubscriptions(cnode.getSharedSubscriptions());
} else {
subInode = cnode.childOf(remainingTopic.headToken());
if (subInode.isPresent()) {
subscriptions.addAll(recursiveMatch(remainingTopic, subInode.get(), depth + 1));
recursiveMatch(remainingTopic, subInode.get(), depth + 1, target);
}
}
return subscriptions;
}

/**
Expand Down Expand Up @@ -360,10 +373,7 @@ private Action remove(String clientId, Topic topic, INode inode, INode iParent,
}
}
if (cnode instanceof TNode) {
// this inode is a tomb, has no clients and should be cleaned up
// Because we implemented cleanTomb below, this should be rare, but possible
// Consider calling cleanTomb here too
return Action.OK;
return cleanTomb(inode, iParent);
}
if (cnode.containsOnly(clientId) && topic.isEmpty() && cnode.allChildren().isEmpty()) {
// last client to leave this node, AND there are no downstream children, remove via TNode tomb
Expand Down Expand Up @@ -393,9 +403,17 @@ private Action remove(String clientId, Topic topic, INode inode, INode iParent,
* @return REPEAT if this method wasn't successful or OK.
*/
private Action cleanTomb(INode inode, INode iParent) {
CNode updatedCnode = iParent.mainNode().copy();
updatedCnode.remove(inode);
return iParent.compareAndSet(iParent.mainNode(), updatedCnode) ? Action.OK : Action.REPEAT;
CNode origCnode = iParent.mainNode();
CNode updatedCnode = origCnode.copy();
INode removed = updatedCnode.remove(inode);
if (removed == inode) {
return iParent.compareAndSet(origCnode, updatedCnode) ? Action.OK : Action.REPEAT;
} else {
// The node removed (from the copy!) was not the node we expected to remove.
// Probably because another thread replaced the TNode with a live node, so
// we don't need to clean it and can return success.
return Action.OK;
}
}

public int size() {
Expand Down
Loading