diff --git a/ChangeLog.txt b/ChangeLog.txt index 34354cf63..64cd0c7ed 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -1,4 +1,5 @@ Version 0.17-SNAPSHOT: + [feature] add `moquette.session_loop.debug` property to enable session loop checking assignments (#714). [break] deprecate `persistent_store` to separate the enablement of persistence with `persistence_enabled` and the path `data_path` (#706). [enhancement] introduced new queue implementation based on segments in memory mapped files. The type of queue implementation could be selected by setting `persistent_queue_type` (#691, #704). diff --git a/broker/src/main/java/io/moquette/broker/MQTTConnection.java b/broker/src/main/java/io/moquette/broker/MQTTConnection.java index c921316a8..6b0f152b4 100644 --- a/broker/src/main/java/io/moquette/broker/MQTTConnection.java +++ b/broker/src/main/java/io/moquette/broker/MQTTConnection.java @@ -44,6 +44,8 @@ final class MQTTConnection { private static final Logger LOG = LoggerFactory.getLogger(MQTTConnection.class); + static final boolean sessionLoopDebug = Boolean.parseBoolean(System.getProperty("moquette.session_loop.debug", "false")); + final Channel channel; private final BrokerConfiguration brokerConfig; private final IAuthenticator authenticator; @@ -108,15 +110,19 @@ void handleMessage(MqttMessage msg) { private void processPubComp(MqttMessage msg) { final int messageID = ((MqttMessageIdVariableHeader) msg.variableHeader()).messageId(); - this.postOffice.routeCommand(bindedSession.getClientID(), "PUBCOMP", () -> { + final String clientID = bindedSession.getClientID(); + this.postOffice.routeCommand(clientID, "PUBCOMP", () -> { + checkMatchSessionLoop(clientID); bindedSession.processPubComp(messageID); - return bindedSession.getClientID(); + return clientID; }); } private void processPubRec(MqttMessage msg) { final int messageID = ((MqttMessageIdVariableHeader) msg.variableHeader()).messageId(); - this.postOffice.routeCommand(bindedSession.getClientID(), "PUBREC", () -> { + final String clientID = bindedSession.getClientID(); + this.postOffice.routeCommand(clientID, "PUBREC", () -> { + checkMatchSessionLoop(clientID); bindedSession.processPubRec(messageID); return null; }); @@ -131,6 +137,7 @@ private void processPubAck(MqttMessage msg) { final int messageID = ((MqttMessageIdVariableHeader) msg.variableHeader()).messageId(); final String clientId = getClientId(); this.postOffice.routeCommand(clientId, "PUB ACK", () -> { + checkMatchSessionLoop(clientId); bindedSession.pubAckReceived(messageID); return null; }); @@ -174,11 +181,23 @@ PostOffice.RouteResult processConnect(MqttConnectMessage msg) { final String sessionId = clientId; return postOffice.routeCommand(clientId, "CONN", () -> { + checkMatchSessionLoop(sessionId); executeConnect(msg, sessionId); return null; }); } + private void checkMatchSessionLoop(String clientId) { + if (!sessionLoopDebug) { + return; + } + final String currentThreadName = Thread.currentThread().getName(); + final String expectedThreadName = postOffice.sessionLoopThreadName(clientId); + if (!expectedThreadName.equals(currentThreadName)) { + throw new IllegalStateException("Expected to be executed on thread " + expectedThreadName + " but running on " + currentThreadName + ". This means a programming error"); + } + } + /** * Invoked by the Session's event loop. * */ @@ -304,6 +323,7 @@ void handleConnectionLost() { // this must not be done on the netty thread LOG.debug("Notifying connection lost event"); postOffice.routeCommand(clientID, "CONN LOST", () -> { + checkMatchSessionLoop(clientID); if (isBoundToSession() || isSessionUnbound()) { LOG.debug("Cleaning {}", clientID); processConnectionLost(clientID); @@ -314,6 +334,7 @@ void handleConnectionLost() { }); } + // Invoked when a TCP connection drops and not when a client send DISCONNECT and close. private void processConnectionLost(String clientID) { if (bindedSession.hasWill()) { postOffice.fireWill(bindedSession.getWill()); @@ -348,6 +369,7 @@ PostOffice.RouteResult processDisconnect(MqttMessage msg) { } return this.postOffice.routeCommand(clientID, "DISCONN", () -> { + checkMatchSessionLoop(clientID); if (!isBoundToSession()) { LOG.debug("NOT processing disconnect {}, not bound.", clientID); return null; @@ -371,6 +393,7 @@ PostOffice.RouteResult processSubscribe(MqttSubscribeMessage msg) { } final String username = NettyUtils.userName(channel); return postOffice.routeCommand(clientID, "SUB", () -> { + checkMatchSessionLoop(clientID); if (isBoundToSession()) postOffice.subscribeClientToTopics(msg, clientID, username, this); return null; @@ -388,6 +411,7 @@ private void processUnsubscribe(MqttUnsubscribeMessage msg) { final int messageId = msg.variableHeader().messageId(); postOffice.routeCommand(clientID, "UNSUB", () -> { + checkMatchSessionLoop(clientID); if (!isBoundToSession()) return null; LOG.trace("Processing UNSUBSCRIBE message. topics: {}", topics); @@ -425,6 +449,7 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) { switch (qos) { case AT_MOST_ONCE: return postOffice.routeCommand(clientId, "PUB QoS0", () -> { + checkMatchSessionLoop(clientId); if (!isBoundToSession()) return null; postOffice.receivedPublishQos0(topic, username, clientId, msg); @@ -432,6 +457,7 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) { }).ifFailed(msg::release); case AT_LEAST_ONCE: return postOffice.routeCommand(clientId, "PUB QoS1", () -> { + checkMatchSessionLoop(clientId); if (!isBoundToSession()) return null; postOffice.receivedPublishQos1(this, topic, username, messageID, msg); @@ -439,6 +465,7 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) { }).ifFailed(msg::release); case EXACTLY_ONCE: { final PostOffice.RouteResult firstStepResult = postOffice.routeCommand(clientId, "PUB QoS2", () -> { + checkMatchSessionLoop(clientId); if (!isBoundToSession()) return null; bindedSession.receivedPublishQos2(messageID, msg); @@ -470,7 +497,9 @@ void sendPubRec(int messageID) { private void processPubRel(MqttMessage msg) { final int messageID = ((MqttMessageIdVariableHeader) msg.variableHeader()).messageId(); - postOffice.routeCommand(bindedSession.getClientID(), "PUBREL", () -> { + final String clientID = bindedSession.getClientID(); + postOffice.routeCommand(clientID, "PUBREL", () -> { + checkMatchSessionLoop(clientID); executePubRel(messageID); return null; }); diff --git a/broker/src/main/java/io/moquette/broker/PostOffice.java b/broker/src/main/java/io/moquette/broker/PostOffice.java index b432b2696..b1870e4cc 100644 --- a/broker/src/main/java/io/moquette/broker/PostOffice.java +++ b/broker/src/main/java/io/moquette/broker/PostOffice.java @@ -198,11 +198,15 @@ public RouteResult ifFailed(Runnable action) { this.sessionExecutors = new Thread[eventLoops]; for (int i = 0; i < eventLoops; i++) { this.sessionExecutors[i] = new Thread(new SessionEventLoop(this.sessionQueues[i])); - this.sessionExecutors[i].setName("Session Executor " + i); + this.sessionExecutors[i].setName(sessionLoopName(i)); this.sessionExecutors[i].start(); } } + private String sessionLoopName(int i) { + return "Session Executor " + i; + } + public void init(SessionRegistry sessionRegistry) { this.sessionRegistry = sessionRegistry; } @@ -624,12 +628,21 @@ void dispatchConnectionLost(String clientId,String userName) { interceptor.notifyClientConnectionLost(clientId, userName); } + String sessionLoopThreadName(String clientId) { + final int targetQueueId = targetQueueOrdinal(clientId); + return sessionLoopName(targetQueueId); + } + + private int targetQueueOrdinal(String clientId) { + return Math.abs(clientId.hashCode()) % this.eventLoops; + } + /** * Route the command to the owning SessionEventLoop * */ public RouteResult routeCommand(String clientId, String actionDescription, Callable action) { SessionCommand cmd = new SessionCommand(clientId, action); - final int targetQueueId = Math.abs(cmd.getSessionId().hashCode()) % this.eventLoops; + final int targetQueueId = targetQueueOrdinal(cmd.getSessionId()); LOG.debug("Routing cmd [{}] for session [{}] to event processor {}", actionDescription, cmd.getSessionId(), targetQueueId); final FutureTask task = new FutureTask<>(() -> { cmd.execute();