Skip to content

Commit

Permalink
mqtt-streaming: Only publish when reconnecting, not after a timeout
Browse files Browse the repository at this point in the history
This brings things inline with MQTT v5 which tightens what's allowed:

> When a Client reconnects with Clean Start set to 0 and a session is present, both the Client and Server MUST resend any unacknowledged PUBLISH packets (where QoS > 0) and PUBREL packets using their original Packet Identifiers. This is the only circumstance where a Client or Server is REQUIRED to resend messages. Clients and Servers MUST NOT resend messages at any other time [MQTT-4.4.0-1].

WIP: Need to verify a few things, update tests
  • Loading branch information
longshorej committed Mar 19, 2019
1 parent 33c0f31 commit 0ea893a
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,9 @@ import scala.util.{Failure, Success}
)
)
} else {
data.activeProducers.values
.foreach(_ ! Producer.ReceiveConnect)

serverConnect(
ConnectReceived(
connect,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ import scala.util.{Failure, Success}
sealed abstract class Event
final case class AcquiredPacketId(packetId: PacketId) extends Event
final case object UnacquiredPacketId extends Event
case object ReceiveConnect extends Event
case object ReceivePubAckRecTimeout extends Event
final case class PubAckReceivedFromRemote(local: Promise[ForwardPubAck]) extends Event
final case class PubRecReceivedFromRemote(local: Promise[ForwardPubRec]) extends Event
case object ReceivePubCompTimeout extends Event
final case class PubCompReceivedFromRemote(local: Promise[ForwardPubComp]) extends Event

sealed abstract class Command
Expand Down Expand Up @@ -113,60 +113,46 @@ import scala.util.{Failure, Success}
}
}

def publishUnacknowledged(data: Publishing)(implicit mat: Materializer): Behavior[Event] = Behaviors.withTimers {
val ReceivePubackrec = "producer-receive-pubackrec"
timer =>
timer.startSingleTimer(ReceivePubackrec, ReceivePubAckRecTimeout, data.settings.producerPubAckRecTimeout)

Behaviors
.receiveMessagePartial[Event] {
case PubAckReceivedFromRemote(local)
if data.publish.flags.contains(ControlPacketFlags.QoSAtLeastOnceDelivery) =>
local.success(ForwardPubAck(data.publishData))
Behaviors.stopped
case PubRecReceivedFromRemote(local)
if data.publish.flags.contains(ControlPacketFlags.QoSExactlyOnceDelivery) =>
local.success(ForwardPubRec(data.publishData))
timer.cancel(ReceivePubackrec)
publishAcknowledged(data)
case ReceivePubAckRecTimeout =>
data.remote.offer(
ForwardPublish(data.publish.copy(flags = data.publish.flags | ControlPacketFlags.DUP),
Some(data.packetId))
)
publishUnacknowledged(data)
}
.receiveSignal {
case (_, PostStop) =>
data.packetRouter ! LocalPacketRouter.Unregister(data.packetId)
data.remote.complete()
Behaviors.same
}
}
def publishUnacknowledged(data: Publishing)(implicit mat: Materializer): Behavior[Event] =
Behaviors
.receiveMessagePartial[Event] {
case PubAckReceivedFromRemote(local)
if data.publish.flags.contains(ControlPacketFlags.QoSAtLeastOnceDelivery) =>
local.success(ForwardPubAck(data.publishData))
Behaviors.stopped
case PubRecReceivedFromRemote(local)
if data.publish.flags.contains(ControlPacketFlags.QoSExactlyOnceDelivery) =>
local.success(ForwardPubRec(data.publishData))
publishAcknowledged(data)
case ReceiveConnect =>
data.remote.offer(
ForwardPublish(data.publish.copy(flags = data.publish.flags | ControlPacketFlags.DUP), Some(data.packetId))
)
publishUnacknowledged(data)
}
.receiveSignal {
case (_, PostStop) =>
data.packetRouter ! LocalPacketRouter.Unregister(data.packetId)
data.remote.complete()
Behaviors.same
}

def publishAcknowledged(data: Publishing)(implicit mat: Materializer): Behavior[Event] = Behaviors.withTimers {
val ReceivePubrel = "producer-receive-pubrel"
timer =>
timer.startSingleTimer(ReceivePubrel, ReceivePubCompTimeout, data.settings.producerPubCompTimeout)

data.remote.offer(ForwardPubRel(data.publish, data.packetId))

Behaviors
.receiveMessagePartial[Event] {
case PubCompReceivedFromRemote(local) =>
local.success(ForwardPubComp(data.publishData))
Behaviors.stopped
case ReceivePubCompTimeout =>
data.remote.offer(ForwardPubRel(data.publish, data.packetId))
publishAcknowledged(data)
}
.receiveSignal {
case (_, PostStop) =>
data.packetRouter ! LocalPacketRouter.Unregister(data.packetId)
data.remote.complete()
Behaviors.same
}
}
def publishAcknowledged(data: Publishing)(implicit mat: Materializer): Behavior[Event] =
Behaviors
.receiveMessagePartial[Event] {
case PubCompReceivedFromRemote(local) =>
local.success(ForwardPubComp(data.publishData))
Behaviors.stopped
case ReceiveConnect =>
data.remote.offer(ForwardPubRel(data.publish, data.packetId))
publishAcknowledged(data)
}
.receiveSignal {
case (_, PostStop) =>
data.packetRouter ! LocalPacketRouter.Unregister(data.packetId)
data.remote.complete()
Behaviors.same
}
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ import scala.util.{Failure, Success}

Behaviors
.receivePartial[Event] {
case (context, ConnAckReceivedLocally(_, remote)) =>
case (context, ConnAckReceivedLocally(ack, remote)) =>
val (queue, source) = Source
.queue[ForwardConnAckCommand](data.settings.serverSendBufferSize, OverflowStrategy.dropNew)
.toMat(BroadcastHub.sink)(Keep.both)
Expand All @@ -390,6 +390,9 @@ import scala.util.{Failure, Success}
data.stash.foreach(context.self.tell)
timer.cancel(ReceiveConnAck)

data.activeProducers.values
.foreach(_ ! Producer.ReceiveConnect)

clientConnected(
ConnAckReplied(
data.connect,
Expand Down

0 comments on commit 0ea893a

Please sign in to comment.