Skip to content

Commit

Permalink
Updated interception mechanism to notify also session event loop exce…
Browse files Browse the repository at this point in the history
…ptions
  • Loading branch information
andsel committed Mar 13, 2023
1 parent 538bb70 commit 41e9d8e
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 3 deletions.
3 changes: 3 additions & 0 deletions broker/src/main/java/io/moquette/broker/PostOffice.java
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,9 @@ public RouteResult ifFailed(Runnable action) {
// executed in session loop thread
// collect the exception thrown to later re-throw
loopThrownExceptions.put(loopThread.getName(), ex);

// This is done in asynch from another thread in BrokerInterceptor
interceptor.notifyLoopException(ex);
});
newLoop.start();
this.sessionExecutors[i] = newLoop;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import io.moquette.interception.messages.InterceptUnsubscribeMessage;

/**
* Basic abstract class usefull to avoid empty methods creation in subclasses.
* Basic abstract class useful to avoid empty methods creation in subclasses.
*/
public abstract class AbstractInterceptHandler implements InterceptHandler {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,13 @@ public void notifyMessageAcknowledged(final InterceptAcknowledgedMessage msg) {
}
}

@Override
public void notifyLoopException(Throwable th) {
for (final InterceptHandler handler : this.handlers.get(InterceptExceptionMessage.class)) {

}
}

@Override
public void addInterceptHandler(InterceptHandler interceptHandler) {
Class<?>[] interceptedMessageTypes = getInterceptedMessageTypes(interceptHandler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

/**
* This interface is used to inject code for intercepting broker events.
* This is part of the API that integrator of Moquette has to implement.
* <p>
* The events can act only as observers.
* <p>
Expand All @@ -33,7 +34,7 @@ public interface InterceptHandler {

Class<?>[] ALL_MESSAGE_TYPES = {InterceptConnectMessage.class, InterceptDisconnectMessage.class,
InterceptConnectionLostMessage.class, InterceptPublishMessage.class, InterceptSubscribeMessage.class,
InterceptUnsubscribeMessage.class, InterceptAcknowledgedMessage.class};
InterceptUnsubscribeMessage.class, InterceptAcknowledgedMessage.class, InterceptExceptionMessage.class};

/**
* @return the identifier of this intercept handler.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public interface Interceptor {

void notifyMessageAcknowledged(InterceptAcknowledgedMessage msg);

void notifyLoopException(Throwable th);

void addInterceptHandler(InterceptHandler interceptHandler);

void removeInterceptHandler(InterceptHandler interceptHandler);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.moquette.interception.messages;

public class InterceptExceptionMessage implements InterceptMessage {
private Throwable error;

public InterceptExceptionMessage(Throwable error) {
this.error = error;
}

public Throwable getError() {
return error;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
*
* You may elect to redistribute this code under either of these licenses.
*/
package io.moquette.persistence;
package persistence;

import io.moquette.broker.SessionRegistry;
import io.moquette.broker.SessionRegistry.EnqueuedMessage;
Expand Down

0 comments on commit 41e9d8e

Please sign in to comment.