Skip to content

Commit

Permalink
Adds debug-enabled check for sessions loop assignments (#714)
Browse files Browse the repository at this point in the history
Adds a check to verify that current session loop thread is the expected one. This is useful to investigate eventual problems with session loops.

This feature could be explicitly enable setting the moquette.session_loop.debug environment variable with the -D JVM flag.
  • Loading branch information
andsel authored Feb 3, 2023
1 parent a6bed14 commit d7272ec
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 6 deletions.
1 change: 1 addition & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
Version 0.17-SNAPSHOT:
[feature] add `moquette.session_loop.debug` property to enable session loop checking assignments (#714).
[break] deprecate `persistent_store` to separate the enablement of persistence with `persistence_enabled` and the path `data_path` (#706).
[enhancement] introduced new queue implementation based on segments in memory mapped files. The type of queue implementation
could be selected by setting `persistent_queue_type` (#691, #704).
Expand Down
37 changes: 33 additions & 4 deletions broker/src/main/java/io/moquette/broker/MQTTConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ final class MQTTConnection {

private static final Logger LOG = LoggerFactory.getLogger(MQTTConnection.class);

static final boolean sessionLoopDebug = Boolean.parseBoolean(System.getProperty("moquette.session_loop.debug", "false"));

final Channel channel;
private final BrokerConfiguration brokerConfig;
private final IAuthenticator authenticator;
Expand Down Expand Up @@ -108,15 +110,19 @@ void handleMessage(MqttMessage msg) {

private void processPubComp(MqttMessage msg) {
final int messageID = ((MqttMessageIdVariableHeader) msg.variableHeader()).messageId();
this.postOffice.routeCommand(bindedSession.getClientID(), "PUBCOMP", () -> {
final String clientID = bindedSession.getClientID();
this.postOffice.routeCommand(clientID, "PUBCOMP", () -> {
checkMatchSessionLoop(clientID);
bindedSession.processPubComp(messageID);
return bindedSession.getClientID();
return clientID;
});
}

private void processPubRec(MqttMessage msg) {
final int messageID = ((MqttMessageIdVariableHeader) msg.variableHeader()).messageId();
this.postOffice.routeCommand(bindedSession.getClientID(), "PUBREC", () -> {
final String clientID = bindedSession.getClientID();
this.postOffice.routeCommand(clientID, "PUBREC", () -> {
checkMatchSessionLoop(clientID);
bindedSession.processPubRec(messageID);
return null;
});
Expand All @@ -131,6 +137,7 @@ private void processPubAck(MqttMessage msg) {
final int messageID = ((MqttMessageIdVariableHeader) msg.variableHeader()).messageId();
final String clientId = getClientId();
this.postOffice.routeCommand(clientId, "PUB ACK", () -> {
checkMatchSessionLoop(clientId);
bindedSession.pubAckReceived(messageID);
return null;
});
Expand Down Expand Up @@ -174,11 +181,23 @@ PostOffice.RouteResult processConnect(MqttConnectMessage msg) {

final String sessionId = clientId;
return postOffice.routeCommand(clientId, "CONN", () -> {
checkMatchSessionLoop(sessionId);
executeConnect(msg, sessionId);
return null;
});
}

private void checkMatchSessionLoop(String clientId) {
if (!sessionLoopDebug) {
return;
}
final String currentThreadName = Thread.currentThread().getName();
final String expectedThreadName = postOffice.sessionLoopThreadName(clientId);
if (!expectedThreadName.equals(currentThreadName)) {
throw new IllegalStateException("Expected to be executed on thread " + expectedThreadName + " but running on " + currentThreadName + ". This means a programming error");
}
}

/**
* Invoked by the Session's event loop.
* */
Expand Down Expand Up @@ -304,6 +323,7 @@ void handleConnectionLost() {
// this must not be done on the netty thread
LOG.debug("Notifying connection lost event");
postOffice.routeCommand(clientID, "CONN LOST", () -> {
checkMatchSessionLoop(clientID);
if (isBoundToSession() || isSessionUnbound()) {
LOG.debug("Cleaning {}", clientID);
processConnectionLost(clientID);
Expand All @@ -314,6 +334,7 @@ void handleConnectionLost() {
});
}

// Invoked when a TCP connection drops and not when a client send DISCONNECT and close.
private void processConnectionLost(String clientID) {
if (bindedSession.hasWill()) {
postOffice.fireWill(bindedSession.getWill());
Expand Down Expand Up @@ -348,6 +369,7 @@ PostOffice.RouteResult processDisconnect(MqttMessage msg) {
}

return this.postOffice.routeCommand(clientID, "DISCONN", () -> {
checkMatchSessionLoop(clientID);
if (!isBoundToSession()) {
LOG.debug("NOT processing disconnect {}, not bound.", clientID);
return null;
Expand All @@ -371,6 +393,7 @@ PostOffice.RouteResult processSubscribe(MqttSubscribeMessage msg) {
}
final String username = NettyUtils.userName(channel);
return postOffice.routeCommand(clientID, "SUB", () -> {
checkMatchSessionLoop(clientID);
if (isBoundToSession())
postOffice.subscribeClientToTopics(msg, clientID, username, this);
return null;
Expand All @@ -388,6 +411,7 @@ private void processUnsubscribe(MqttUnsubscribeMessage msg) {
final int messageId = msg.variableHeader().messageId();

postOffice.routeCommand(clientID, "UNSUB", () -> {
checkMatchSessionLoop(clientID);
if (!isBoundToSession())
return null;
LOG.trace("Processing UNSUBSCRIBE message. topics: {}", topics);
Expand Down Expand Up @@ -425,20 +449,23 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) {
switch (qos) {
case AT_MOST_ONCE:
return postOffice.routeCommand(clientId, "PUB QoS0", () -> {
checkMatchSessionLoop(clientId);
if (!isBoundToSession())
return null;
postOffice.receivedPublishQos0(topic, username, clientId, msg);
return null;
}).ifFailed(msg::release);
case AT_LEAST_ONCE:
return postOffice.routeCommand(clientId, "PUB QoS1", () -> {
checkMatchSessionLoop(clientId);
if (!isBoundToSession())
return null;
postOffice.receivedPublishQos1(this, topic, username, messageID, msg);
return null;
}).ifFailed(msg::release);
case EXACTLY_ONCE: {
final PostOffice.RouteResult firstStepResult = postOffice.routeCommand(clientId, "PUB QoS2", () -> {
checkMatchSessionLoop(clientId);
if (!isBoundToSession())
return null;
bindedSession.receivedPublishQos2(messageID, msg);
Expand Down Expand Up @@ -470,7 +497,9 @@ void sendPubRec(int messageID) {

private void processPubRel(MqttMessage msg) {
final int messageID = ((MqttMessageIdVariableHeader) msg.variableHeader()).messageId();
postOffice.routeCommand(bindedSession.getClientID(), "PUBREL", () -> {
final String clientID = bindedSession.getClientID();
postOffice.routeCommand(clientID, "PUBREL", () -> {
checkMatchSessionLoop(clientID);
executePubRel(messageID);
return null;
});
Expand Down
17 changes: 15 additions & 2 deletions broker/src/main/java/io/moquette/broker/PostOffice.java
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,15 @@ public RouteResult ifFailed(Runnable action) {
this.sessionExecutors = new Thread[eventLoops];
for (int i = 0; i < eventLoops; i++) {
this.sessionExecutors[i] = new Thread(new SessionEventLoop(this.sessionQueues[i]));
this.sessionExecutors[i].setName("Session Executor " + i);
this.sessionExecutors[i].setName(sessionLoopName(i));
this.sessionExecutors[i].start();
}
}

private String sessionLoopName(int i) {
return "Session Executor " + i;
}

public void init(SessionRegistry sessionRegistry) {
this.sessionRegistry = sessionRegistry;
}
Expand Down Expand Up @@ -624,12 +628,21 @@ void dispatchConnectionLost(String clientId,String userName) {
interceptor.notifyClientConnectionLost(clientId, userName);
}

String sessionLoopThreadName(String clientId) {
final int targetQueueId = targetQueueOrdinal(clientId);
return sessionLoopName(targetQueueId);
}

private int targetQueueOrdinal(String clientId) {
return Math.abs(clientId.hashCode()) % this.eventLoops;
}

/**
* Route the command to the owning SessionEventLoop
* */
public RouteResult routeCommand(String clientId, String actionDescription, Callable<String> action) {
SessionCommand cmd = new SessionCommand(clientId, action);
final int targetQueueId = Math.abs(cmd.getSessionId().hashCode()) % this.eventLoops;
final int targetQueueId = targetQueueOrdinal(cmd.getSessionId());
LOG.debug("Routing cmd [{}] for session [{}] to event processor {}", actionDescription, cmd.getSessionId(), targetQueueId);
final FutureTask<String> task = new FutureTask<>(() -> {
cmd.execute();
Expand Down

0 comments on commit d7272ec

Please sign in to comment.