Skip to content

Commit

Permalink
Updated interception mechanism to notify also session event loop exce…
Browse files Browse the repository at this point in the history
…ptions
  • Loading branch information
andsel committed Mar 13, 2023
1 parent 538bb70 commit ac8ea0b
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 4 deletions.
4 changes: 4 additions & 0 deletions broker/src/main/java/io/moquette/broker/PostOffice.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.moquette.broker.subscriptions.ISubscriptionsDirectory;
import io.moquette.broker.subscriptions.Subscription;
import io.moquette.broker.subscriptions.Topic;
import io.moquette.interception.messages.InterceptExceptionMessage;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.*;
Expand Down Expand Up @@ -198,6 +199,9 @@ public RouteResult ifFailed(Runnable action) {
// executed in session loop thread
// collect the exception thrown to later re-throw
loopThrownExceptions.put(loopThread.getName(), ex);

// This is done in asynch from another thread in BrokerInterceptor
interceptor.notifyLoopException(new InterceptExceptionMessage(ex));
});
newLoop.start();
this.sessionExecutors[i] = newLoop;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import io.moquette.interception.messages.InterceptUnsubscribeMessage;

/**
* Basic abstract class usefull to avoid empty methods creation in subclasses.
* Basic abstract class useful to avoid empty methods creation in subclasses.
*/
public abstract class AbstractInterceptHandler implements InterceptHandler {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,13 @@ public void notifyMessageAcknowledged(final InterceptAcknowledgedMessage msg) {
}
}

@Override
public void notifyLoopException(InterceptExceptionMessage msg) {
for (final InterceptHandler handler : this.handlers.get(InterceptExceptionMessage.class)) {
handler.onSessionLoopError(msg.getError());
}
}

@Override
public void addInterceptHandler(InterceptHandler interceptHandler) {
Class<?>[] interceptedMessageTypes = getInterceptedMessageTypes(interceptHandler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

/**
* This interface is used to inject code for intercepting broker events.
* This is part of the API that integrator of Moquette has to implement.
* <p>
* The events can act only as observers.
* <p>
Expand All @@ -33,7 +34,7 @@ public interface InterceptHandler {

Class<?>[] ALL_MESSAGE_TYPES = {InterceptConnectMessage.class, InterceptDisconnectMessage.class,
InterceptConnectionLostMessage.class, InterceptPublishMessage.class, InterceptSubscribeMessage.class,
InterceptUnsubscribeMessage.class, InterceptAcknowledgedMessage.class};
InterceptUnsubscribeMessage.class, InterceptAcknowledgedMessage.class, InterceptExceptionMessage.class};

/**
* @return the identifier of this intercept handler.
Expand Down Expand Up @@ -65,4 +66,6 @@ public interface InterceptHandler {
void onUnsubscribe(InterceptUnsubscribeMessage msg);

void onMessageAcknowledged(InterceptAcknowledgedMessage msg);

void onSessionLoopError(Throwable error);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import io.moquette.interception.messages.InterceptAcknowledgedMessage;
import io.moquette.broker.subscriptions.Subscription;
import io.moquette.interception.messages.InterceptExceptionMessage;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;

Expand Down Expand Up @@ -47,6 +48,8 @@ public interface Interceptor {

void notifyMessageAcknowledged(InterceptAcknowledgedMessage msg);

void notifyLoopException(InterceptExceptionMessage th);

void addInterceptHandler(InterceptHandler interceptHandler);

void removeInterceptHandler(InterceptHandler interceptHandler);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.moquette.interception.messages;

public class InterceptExceptionMessage implements InterceptMessage {
private Throwable error;

public InterceptExceptionMessage(Throwable error) {
this.error = error;
}

public Throwable getError() {
return error;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttVersion;
import org.jetbrains.annotations.NotNull;
//import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -85,7 +85,7 @@ private MQTTConnection createMQTTConnection(BrokerConfiguration config, Channel
return new MQTTConnection(channel, config, mockAuthenticator, sessionRegistry, postOffice);
}

@NotNull
// @NotNull
static ISessionsRepository memorySessionsRepository() {
return new MemorySessionsRepository();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ public void onUnsubscribe(InterceptUnsubscribeMessage msg) {
public void onMessageAcknowledged(InterceptAcknowledgedMessage msg) {
n.set(90);
}

@Override
public void onSessionLoopError(Throwable error) {
throw new RuntimeException(error);
}
}

private static final BrokerInterceptor interceptor = new BrokerInterceptor(
Expand Down

0 comments on commit ac8ea0b

Please sign in to comment.