diff --git a/broker/src/main/java/io/moquette/broker/PostOffice.java b/broker/src/main/java/io/moquette/broker/PostOffice.java index b8d2c1ebe..38e3c5c83 100644 --- a/broker/src/main/java/io/moquette/broker/PostOffice.java +++ b/broker/src/main/java/io/moquette/broker/PostOffice.java @@ -198,6 +198,9 @@ public RouteResult ifFailed(Runnable action) { // executed in session loop thread // collect the exception thrown to later re-throw loopThrownExceptions.put(loopThread.getName(), ex); + + // This is done in asynch from another thread in BrokerInterceptor + interceptor.notifyLoopException(ex); }); newLoop.start(); this.sessionExecutors[i] = newLoop; diff --git a/broker/src/main/java/io/moquette/interception/AbstractInterceptHandler.java b/broker/src/main/java/io/moquette/interception/AbstractInterceptHandler.java index bd6e11f96..2af961d0a 100644 --- a/broker/src/main/java/io/moquette/interception/AbstractInterceptHandler.java +++ b/broker/src/main/java/io/moquette/interception/AbstractInterceptHandler.java @@ -25,7 +25,7 @@ import io.moquette.interception.messages.InterceptUnsubscribeMessage; /** - * Basic abstract class usefull to avoid empty methods creation in subclasses. + * Basic abstract class useful to avoid empty methods creation in subclasses. */ public abstract class AbstractInterceptHandler implements InterceptHandler { diff --git a/broker/src/main/java/io/moquette/interception/BrokerInterceptor.java b/broker/src/main/java/io/moquette/interception/BrokerInterceptor.java index 42bc8495a..50a6615b5 100644 --- a/broker/src/main/java/io/moquette/interception/BrokerInterceptor.java +++ b/broker/src/main/java/io/moquette/interception/BrokerInterceptor.java @@ -166,6 +166,13 @@ public void notifyMessageAcknowledged(final InterceptAcknowledgedMessage msg) { } } + @Override + public void notifyLoopException(Throwable th) { + for (final InterceptHandler handler : this.handlers.get(InterceptExceptionMessage.class)) { + + } + } + @Override public void addInterceptHandler(InterceptHandler interceptHandler) { Class[] interceptedMessageTypes = getInterceptedMessageTypes(interceptHandler); diff --git a/broker/src/main/java/io/moquette/interception/InterceptHandler.java b/broker/src/main/java/io/moquette/interception/InterceptHandler.java index c5dde510e..72963d3b9 100644 --- a/broker/src/main/java/io/moquette/interception/InterceptHandler.java +++ b/broker/src/main/java/io/moquette/interception/InterceptHandler.java @@ -22,6 +22,7 @@ /** * This interface is used to inject code for intercepting broker events. + * This is part of the API that integrator of Moquette has to implement. *

* The events can act only as observers. *

@@ -33,7 +34,7 @@ public interface InterceptHandler { Class[] ALL_MESSAGE_TYPES = {InterceptConnectMessage.class, InterceptDisconnectMessage.class, InterceptConnectionLostMessage.class, InterceptPublishMessage.class, InterceptSubscribeMessage.class, - InterceptUnsubscribeMessage.class, InterceptAcknowledgedMessage.class}; + InterceptUnsubscribeMessage.class, InterceptAcknowledgedMessage.class, InterceptExceptionMessage.class}; /** * @return the identifier of this intercept handler. diff --git a/broker/src/main/java/io/moquette/interception/Interceptor.java b/broker/src/main/java/io/moquette/interception/Interceptor.java index e69509818..ab0a15e0f 100644 --- a/broker/src/main/java/io/moquette/interception/Interceptor.java +++ b/broker/src/main/java/io/moquette/interception/Interceptor.java @@ -47,6 +47,8 @@ public interface Interceptor { void notifyMessageAcknowledged(InterceptAcknowledgedMessage msg); + void notifyLoopException(Throwable th); + void addInterceptHandler(InterceptHandler interceptHandler); void removeInterceptHandler(InterceptHandler interceptHandler); diff --git a/broker/src/main/java/io/moquette/interception/messages/InterceptExceptionMessage.java b/broker/src/main/java/io/moquette/interception/messages/InterceptExceptionMessage.java new file mode 100644 index 000000000..fda676ec0 --- /dev/null +++ b/broker/src/main/java/io/moquette/interception/messages/InterceptExceptionMessage.java @@ -0,0 +1,13 @@ +package io.moquette.interception.messages; + +public class InterceptExceptionMessage implements InterceptMessage { + private Throwable error; + + public InterceptExceptionMessage(Throwable error) { + this.error = error; + } + + public Throwable getError() { + return error; + } +} diff --git a/broker/src/main/java/io/moquette/persistence/EnqueuedMessageValueType.java b/broker/src/main/java/io/moquette/persistence/EnqueuedMessageValueType.java index fb471f8b8..302f3c7df 100644 --- a/broker/src/main/java/io/moquette/persistence/EnqueuedMessageValueType.java +++ b/broker/src/main/java/io/moquette/persistence/EnqueuedMessageValueType.java @@ -13,7 +13,7 @@ * * You may elect to redistribute this code under either of these licenses. */ -package io.moquette.persistence; +package persistence; import io.moquette.broker.SessionRegistry; import io.moquette.broker.SessionRegistry.EnqueuedMessage;