diff --git a/broker/src/main/java/io/moquette/broker/SessionRegistry.java b/broker/src/main/java/io/moquette/broker/SessionRegistry.java index 903f9af35..406582da9 100644 --- a/broker/src/main/java/io/moquette/broker/SessionRegistry.java +++ b/broker/src/main/java/io/moquette/broker/SessionRegistry.java @@ -26,9 +26,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; import java.net.InetSocketAddress; import java.util.Collection; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Queue; @@ -38,20 +41,44 @@ public class SessionRegistry { - public abstract static class EnqueuedMessage { + public abstract static class EnqueuedMessage implements Serializable { } static class PublishedMessage extends EnqueuedMessage { - final Topic topic; - final MqttQoS publishingQos; - final ByteBuf payload; + Topic topic; + MqttQoS publishingQos; + transient ByteBuf payload; + + public PublishedMessage() { + this.topic = null; + this.publishingQos = null; + } PublishedMessage(Topic topic, MqttQoS publishingQos, ByteBuf payload) { this.topic = topic; this.publishingQos = publishingQos; this.payload = payload; } + + private void writeObject(ObjectOutputStream oos) throws IOException { + oos.defaultWriteObject(); + byte[] byteArr = new byte[payload.readableBytes()]; + ByteBuf copy = payload.copy(); + copy.readBytes(byteArr); + copy.release(); + oos.writeInt(byteArr.length); + oos.write(byteArr); + } + + private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException { + ois.defaultReadObject(); + int length = ois.readInt(); + byte[] byteArr = new byte[length]; + ois.read(byteArr); + payload = Unpooled.wrappedBuffer(byteArr); + } + } static final class PubRelMarker extends EnqueuedMessage {