Skip to content

Commit

Permalink
Bubble session loops excpetions
Browse files Browse the repository at this point in the history
When a session loop the execption is simply logged and lost, so the error status is not reported to the broker.
This PR interrupt the session loop on an exception, saving the error in a local variable.

When the PostOffice terminates it re-throw the first (and last) exception that was reached during session loop
execution.
  • Loading branch information
andsel committed Mar 4, 2023
1 parent ba139db commit 9ac604d
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 7 deletions.
14 changes: 9 additions & 5 deletions broker/src/main/java/io/moquette/broker/PostOffice.java
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public RouteResult ifFailed(Runnable action) {
private SessionRegistry sessionRegistry;
private BrokerInterceptor interceptor;

private final Thread[] sessionExecutors;
private final SessionEventLoop[] sessionExecutors;
private final BlockingQueue<FutureTask<String>>[] sessionQueues;
private final int eventLoops = Runtime.getRuntime().availableProcessors();
private final FailedPublishCollection failedPublishes = new FailedPublishCollection();
Expand All @@ -195,9 +195,9 @@ public RouteResult ifFailed(Runnable action) {
for (int i = 0; i < eventLoops; i++) {
this.sessionQueues[i] = new ArrayBlockingQueue<>(sessionQueueSize);
}
this.sessionExecutors = new Thread[eventLoops];
this.sessionExecutors = new SessionEventLoop[eventLoops];
for (int i = 0; i < eventLoops; i++) {
this.sessionExecutors[i] = new Thread(new SessionEventLoop(this.sessionQueues[i]));
this.sessionExecutors[i] = new SessionEventLoop(this.sessionQueues[i]);
this.sessionExecutors[i].setName(sessionLoopName(i));
this.sessionExecutors[i].start();
}
Expand Down Expand Up @@ -663,16 +663,20 @@ public RouteResult routeCommand(String clientId, String actionDescription, Calla
}

public void terminate() {
for (Thread processor : sessionExecutors) {
for (SessionEventLoop processor : sessionExecutors) {
processor.interrupt();
}
for (Thread processor : sessionExecutors) {
for (SessionEventLoop processor : sessionExecutors) {
try {
processor.join(5_000);
} catch (InterruptedException ex) {
LOG.info("Interrupted while joining session event loop {}", processor.getName(), ex);
}
}

for (SessionEventLoop processor : sessionExecutors) {
processor.failIfError();
}
}

/**
Expand Down
16 changes: 14 additions & 2 deletions broker/src/main/java/io/moquette/broker/SessionEventLoop.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.FutureTask;

final class SessionEventLoop implements Runnable {
final class SessionEventLoop extends Thread {

private static final Logger LOG = LoggerFactory.getLogger(SessionEventLoop.class);

private final BlockingQueue<FutureTask<String>> sessionQueue;
private final boolean flushOnExit;

private RuntimeException interruptingError = null;

public SessionEventLoop(BlockingQueue<FutureTask<String>> sessionQueue) {
this(sessionQueue, true);
}
Expand All @@ -35,6 +37,9 @@ public void run() {
} catch (InterruptedException e) {
LOG.info("SessionEventLoop {} interrupted", Thread.currentThread().getName());
Thread.currentThread().interrupt();
} catch (RuntimeException th) {
interruptingError = th;
Thread.currentThread().interrupt();
}
}
LOG.info("SessionEventLoop {} exit", Thread.currentThread().getName());
Expand All @@ -48,8 +53,15 @@ public static void executeTask(final FutureTask<String> task) {
// we ran it, but we have to grab the exception if raised
task.get();
} catch (Throwable th) {
LOG.info("SessionEventLoop {} reached exception in processing command", Thread.currentThread().getName(), th);
LOG.warn("SessionEventLoop {} reached exception in processing command", Thread.currentThread().getName(), th);
throw new RuntimeException(th);
}
}
}

public void failIfError() {
if (interruptingError != null) {
throw interruptingError;
}
}
}

0 comments on commit 9ac604d

Please sign in to comment.