Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bubble up session loop's exceptions #736

Merged
merged 4 commits into from
Mar 16, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 25 additions & 14 deletions broker/src/main/java/io/moquette/broker/PostOffice.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,15 @@
import io.moquette.broker.subscriptions.ISubscriptionsDirectory;
import io.moquette.broker.subscriptions.Subscription;
import io.moquette.broker.subscriptions.Topic;
import io.moquette.interception.messages.InterceptExceptionMessage;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.*;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -178,10 +173,11 @@ public RouteResult ifFailed(Runnable action) {
private SessionRegistry sessionRegistry;
private BrokerInterceptor interceptor;

private final Thread[] sessionExecutors;
private final SessionEventLoop[] sessionExecutors;
private final BlockingQueue<FutureTask<String>>[] sessionQueues;
private final int eventLoops = Runtime.getRuntime().availableProcessors();
private final FailedPublishCollection failedPublishes = new FailedPublishCollection();
private final ConcurrentMap<String, Throwable> loopThrownExceptions = new ConcurrentHashMap<>();

PostOffice(ISubscriptionsDirectory subscriptions, IRetainedRepository retainedRepository,
SessionRegistry sessionRegistry, BrokerInterceptor interceptor, Authorizator authorizator, int sessionQueueSize) {
Expand All @@ -195,11 +191,20 @@ public RouteResult ifFailed(Runnable action) {
for (int i = 0; i < eventLoops; i++) {
this.sessionQueues[i] = new ArrayBlockingQueue<>(sessionQueueSize);
}
this.sessionExecutors = new Thread[eventLoops];
this.sessionExecutors = new SessionEventLoop[eventLoops];
for (int i = 0; i < eventLoops; i++) {
this.sessionExecutors[i] = new Thread(new SessionEventLoop(this.sessionQueues[i]));
this.sessionExecutors[i].setName(sessionLoopName(i));
this.sessionExecutors[i].start();
SessionEventLoop newLoop = new SessionEventLoop(this.sessionQueues[i]);
newLoop.setName(sessionLoopName(i));
newLoop.setUncaughtExceptionHandler((loopThread, ex) -> {
// executed in session loop thread
// collect the exception thrown to later re-throw
loopThrownExceptions.put(loopThread.getName(), ex);

// This is done in asynch from another thread in BrokerInterceptor
interceptor.notifyLoopException(new InterceptExceptionMessage(ex));
});
newLoop.start();
this.sessionExecutors[i] = newLoop;
}
}

Expand Down Expand Up @@ -663,16 +668,22 @@ public RouteResult routeCommand(String clientId, String actionDescription, Calla
}

public void terminate() {
for (Thread processor : sessionExecutors) {
for (SessionEventLoop processor : sessionExecutors) {
processor.interrupt();
}
for (Thread processor : sessionExecutors) {
for (SessionEventLoop processor : sessionExecutors) {
try {
processor.join(5_000);
} catch (InterruptedException ex) {
LOG.info("Interrupted while joining session event loop {}", processor.getName(), ex);
}
}

for (Map.Entry<String, Throwable> loopThrownExceptionEntry : loopThrownExceptions.entrySet()) {
String threadName = loopThrownExceptionEntry.getKey();
Throwable threadError = loopThrownExceptionEntry.getValue();
LOG.error("Session event loop {} terminated with error", threadName, threadError);
}
}

/**
Expand Down
5 changes: 3 additions & 2 deletions broker/src/main/java/io/moquette/broker/SessionEventLoop.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.FutureTask;

final class SessionEventLoop implements Runnable {
final class SessionEventLoop extends Thread {

private static final Logger LOG = LoggerFactory.getLogger(SessionEventLoop.class);

Expand Down Expand Up @@ -48,7 +48,8 @@ public static void executeTask(final FutureTask<String> task) {
// we ran it, but we have to grab the exception if raised
task.get();
} catch (Throwable th) {
LOG.info("SessionEventLoop {} reached exception in processing command", Thread.currentThread().getName(), th);
LOG.warn("SessionEventLoop {} reached exception in processing command", Thread.currentThread().getName(), th);
throw new RuntimeException(th);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import io.moquette.interception.messages.InterceptUnsubscribeMessage;

/**
* Basic abstract class usefull to avoid empty methods creation in subclasses.
* Basic abstract class useful to avoid empty methods creation in subclasses.
*/
public abstract class AbstractInterceptHandler implements InterceptHandler {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,13 @@ public void notifyMessageAcknowledged(final InterceptAcknowledgedMessage msg) {
}
}

@Override
public void notifyLoopException(InterceptExceptionMessage msg) {
for (final InterceptHandler handler : this.handlers.get(InterceptExceptionMessage.class)) {
handler.onSessionLoopError(msg.getError());
}
}

@Override
public void addInterceptHandler(InterceptHandler interceptHandler) {
Class<?>[] interceptedMessageTypes = getInterceptedMessageTypes(interceptHandler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

/**
* This interface is used to inject code for intercepting broker events.
* This is part of the API that integrator of Moquette has to implement.
* <p>
* The events can act only as observers.
* <p>
Expand All @@ -33,7 +34,7 @@ public interface InterceptHandler {

Class<?>[] ALL_MESSAGE_TYPES = {InterceptConnectMessage.class, InterceptDisconnectMessage.class,
InterceptConnectionLostMessage.class, InterceptPublishMessage.class, InterceptSubscribeMessage.class,
InterceptUnsubscribeMessage.class, InterceptAcknowledgedMessage.class};
InterceptUnsubscribeMessage.class, InterceptAcknowledgedMessage.class, InterceptExceptionMessage.class};

/**
* @return the identifier of this intercept handler.
Expand Down Expand Up @@ -65,4 +66,6 @@ public interface InterceptHandler {
void onUnsubscribe(InterceptUnsubscribeMessage msg);

void onMessageAcknowledged(InterceptAcknowledgedMessage msg);

void onSessionLoopError(Throwable error);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import io.moquette.interception.messages.InterceptAcknowledgedMessage;
import io.moquette.broker.subscriptions.Subscription;
import io.moquette.interception.messages.InterceptExceptionMessage;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;

Expand Down Expand Up @@ -47,6 +48,8 @@ public interface Interceptor {

void notifyMessageAcknowledged(InterceptAcknowledgedMessage msg);

void notifyLoopException(InterceptExceptionMessage th);

void addInterceptHandler(InterceptHandler interceptHandler);

void removeInterceptHandler(InterceptHandler interceptHandler);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.moquette.interception.messages;

public class InterceptExceptionMessage implements InterceptMessage {
private Throwable error;

public InterceptExceptionMessage(Throwable error) {
this.error = error;
}

public Throwable getError() {
return error;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttVersion;
import org.jetbrains.annotations.NotNull;
//import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -85,7 +85,7 @@ private MQTTConnection createMQTTConnection(BrokerConfiguration config, Channel
return new MQTTConnection(channel, config, mockAuthenticator, sessionRegistry, postOffice);
}

@NotNull
// @NotNull
static ISessionsRepository memorySessionsRepository() {
return new MemorySessionsRepository();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ public void onUnsubscribe(InterceptUnsubscribeMessage msg) {
public void onMessageAcknowledged(InterceptAcknowledgedMessage msg) {
n.set(90);
}

@Override
public void onSessionLoopError(Throwable error) {
throw new RuntimeException(error);
}
}

private static final BrokerInterceptor interceptor = new BrokerInterceptor(
Expand Down