From c2bf13b7b8d7b1e8eb98e25a1e9440d2b56541a3 Mon Sep 17 00:00:00 2001 From: huntc Date: Sun, 28 Jul 2019 17:52:42 +1000 Subject: [PATCH] Failure to attain QoS is not bad Prior to this commit, a failure to attain a QoS level i.e. subscribe, was considered bad. The spec treats this condition as one being something for the application to handle e.g. a client can decide to proceed or not given that one of the topics being subscribed was able to be so. The fix is to deprecate the BadSubAckMessage and always return SubAck. This then permits the Subscriber to process SubAcks whether they are good or bad from the application's perspective. --- .../stream/alpakka/mqtt/streaming/model.scala | 14 ++++---------- .../test/scala/docs/scaladsl/MqttCodecSpec.scala | 16 +++------------- 2 files changed, 7 insertions(+), 23 deletions(-) diff --git a/mqtt-streaming/src/main/scala/akka/stream/alpakka/mqtt/streaming/model.scala b/mqtt-streaming/src/main/scala/akka/stream/alpakka/mqtt/streaming/model.scala index a345ebc633..793276ec55 100644 --- a/mqtt-streaming/src/main/scala/akka/stream/alpakka/mqtt/streaming/model.scala +++ b/mqtt-streaming/src/main/scala/akka/stream/alpakka/mqtt/streaming/model.scala @@ -550,8 +550,10 @@ object MqttCodec { extends DecodeError /** - * Something is wrong with the subscribe ack message + * Unable to subscribe at the requested QoS + * @deprecated this message was never able to be returned - always use [[SubAck]] to test subscribed QoS, since 1.1.1 */ + @deprecated("this message was never able to be returned - always use [[SubAck]] to test subscribed QoS", "1.1.1") final case class BadSubAckMessage(packetId: PacketId, returnCodes: Seq[ControlPacketFlags]) extends DecodeError /** @@ -1005,15 +1007,7 @@ object MqttCodec { returnCodes } val returnCodes = decodeReturnCodes(l - (packetLen - v.len), Vector.empty) - val returnCodesValid = returnCodes.nonEmpty && returnCodes.foldLeft(true) { - case (true, rc) if rc.underlying < ControlPacketFlags.QoSReserved.underlying => true - case _ => false - } - if (returnCodesValid) { - Right(SubAck(packetId, returnCodes)) - } else { - Left(BadSubAckMessage(packetId, returnCodes)) - } + Right(SubAck(packetId, returnCodes)) } catch { case _: NoSuchElementException => Left(BufferUnderflow) } diff --git a/mqtt-streaming/src/test/scala/docs/scaladsl/MqttCodecSpec.scala b/mqtt-streaming/src/test/scala/docs/scaladsl/MqttCodecSpec.scala index 335785dad4..cd38ff7f4b 100644 --- a/mqtt-streaming/src/test/scala/docs/scaladsl/MqttCodecSpec.scala +++ b/mqtt-streaming/src/test/scala/docs/scaladsl/MqttCodecSpec.scala @@ -358,23 +358,13 @@ class MqttCodecSpec extends WordSpec with Matchers { bytes.iterator.decodeControlPacket(MaxPacketSize) shouldBe Right(packet) } - "bad sub ack message when decoding sub ack packets given failure QoS" in { + "regular sub ack message when decoding sub ack packets given failure QoS" in { val bsb: ByteStringBuilder = ByteString.newBuilder val packet = SubAck(PacketId(1), List(ControlPacketFlags.QoSExactlyOnceDelivery, ControlPacketFlags.QoSFailure)) val bytes = packet.encode(bsb).result() bytes.iterator - .decodeControlPacket(MaxPacketSize) shouldBe Left( - BadSubAckMessage(PacketId(1), List(ControlPacketFlags.QoSExactlyOnceDelivery, ControlPacketFlags.QoSFailure)) - ) - } - - "bad sub ack message when decoding sub ack packets given no topics" in { - val bsb: ByteStringBuilder = ByteString.newBuilder - val packet = SubAck(PacketId(1), List.empty) - val bytes = packet.encode(bsb).result() - bytes.iterator - .decodeControlPacket(MaxPacketSize) shouldBe Left( - BadSubAckMessage(PacketId(1), List.empty) + .decodeControlPacket(MaxPacketSize) shouldBe Right( + SubAck(PacketId(1), List(ControlPacketFlags.QoSExactlyOnceDelivery, ControlPacketFlags.QoSFailure)) ) }