diff --git a/src/Emitter.Runtime/Network/Mqtt/MqttContext.cs b/src/Emitter.Runtime/Network/Mqtt/MqttContext.cs index f0aac515..7f9c21f4 100644 --- a/src/Emitter.Runtime/Network/Mqtt/MqttContext.cs +++ b/src/Emitter.Runtime/Network/Mqtt/MqttContext.cs @@ -24,15 +24,29 @@ namespace Emitter.Network /// public sealed class MqttContext { + /// + /// Constructs a context from the connect packet. + /// + /// The packet to use. + internal MqttContext(MqttConnectPacket packet) + { + this.Version = packet.ProtocolVersion; + this.QoS = packet.QoS; + this.IsEmitter = packet.IsEmitter; + this.ClientId = packet.ClientId; + this.Username = packet.Username; + } + /// /// Creates a new context. /// /// The version of MQTT. /// Whether this is our special implementation. /// The client id specified in the MQTT connect packet. - public MqttContext(MqttProtocolVersion version, bool isEmitter, string id, string username) + public MqttContext(MqttProtocolVersion version, QoS qos, bool isEmitter, string id, string username) { this.Version = version; + this.QoS = qos; this.IsEmitter = isEmitter; this.ClientId = id; this.Username = username; @@ -48,6 +62,11 @@ public MqttContext(MqttProtocolVersion version, bool isEmitter, string id, strin /// public readonly bool IsEmitter; + /// + /// Gets the quality of service requested + /// + public readonly QoS QoS; + /// /// Gets the MQTT client id passed during the connect. /// diff --git a/src/Emitter.Runtime/Network/Mqtt/MqttHandler.cs b/src/Emitter.Runtime/Network/Mqtt/MqttHandler.cs index f133e633..403cd556 100644 --- a/src/Emitter.Runtime/Network/Mqtt/MqttHandler.cs +++ b/src/Emitter.Runtime/Network/Mqtt/MqttHandler.cs @@ -34,13 +34,7 @@ internal static unsafe class MqttHandler public static ProcessingState OnConnect(IClient client, MqttConnectPacket packet) { // Set the protocol version to the client - client.Context = new MqttContext( - packet.ProtocolVersion, - packet.IsEmitter, - packet.ClientId, - packet.Username - ); - + client.Context = new MqttContext(packet); switch (packet.ProtocolVersion) { case MqttProtocolVersion.V3_1: break; @@ -131,7 +125,9 @@ public static ProcessingState OnPublish(IClient client, MqttPublishPacket packet HandlePublish.Process(client, packet.Channel, packet.Message); // Send the ack and stop the processing - client.SendMqttPuback(packet.MessageId); + if (client.Context.QoS > QoS.AtMostOnce) + client.SendMqttPuback(packet.MessageId); + return ProcessingState.Stop; } diff --git a/src/Emitter.Runtime/Network/Mqtt/MqttSender.cs b/src/Emitter.Runtime/Network/Mqtt/MqttSender.cs index 56a3497c..396e46c0 100644 --- a/src/Emitter.Runtime/Network/Mqtt/MqttSender.cs +++ b/src/Emitter.Runtime/Network/Mqtt/MqttSender.cs @@ -95,7 +95,7 @@ public MqttContext Context { get { - return new MqttContext(MqttProtocolVersion.VEmitter, true, "broker", "broker"); + return new MqttContext(MqttProtocolVersion.VEmitter, QoS.AtMostOnce, true, "broker", "broker"); } set {