Skip to content

Commit

Permalink
mqtt-streaming: Republish messages on reconnect only by default
Browse files Browse the repository at this point in the history
Previously, messages for QoS1/2 were only republished on an interval
after not receiving an ack.

It is more conventional to instead republish everything only on connect,
and indeed to be compliant for MQTT 5, that is the only time this is
allowed.

To accomodate this, the timeouts default to 0, but the previous behavior
can still be restored by changing the default producer timeout settings.
  • Loading branch information
longshorej committed Mar 20, 2019
1 parent afe11a7 commit b1cbeb8
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ final class MqttSessionSettings private (val maxPacketSize: Int = 4096,
val eventParallelism: Int = 10,
val receiveConnectTimeout: FiniteDuration = 5.minutes,
val receiveConnAckTimeout: FiniteDuration = 30.seconds,
val producerPubAckRecTimeout: FiniteDuration = 15.seconds,
val producerPubCompTimeout: FiniteDuration = 15.seconds,
val producerPubAckRecTimeout: FiniteDuration = 0.seconds,
val producerPubCompTimeout: FiniteDuration = 0.seconds,
val consumerPubAckRecTimeout: FiniteDuration = 30.seconds,
val consumerPubCompTimeout: FiniteDuration = 30.seconds,
val consumerPubRelTimeout: FiniteDuration = 30.seconds,
Expand Down Expand Up @@ -105,7 +105,7 @@ final class MqttSessionSettings private (val maxPacketSize: Int = 4096,

/**
* For producers of PUBLISH, the amount of time to wait to ack/receive a QoS 1/2 publish before retrying with
* the DUP flag set. Defaults to 15 seconds.
* the DUP flag set. Defaults to 0 seconds, which means republishing only occurs on reconnect.
*/
def withProducerPubAckRecTimeout(producerPubAckRecTimeout: FiniteDuration): MqttSessionSettings =
copy(producerPubAckRecTimeout = producerPubAckRecTimeout)
Expand All @@ -114,14 +114,14 @@ final class MqttSessionSettings private (val maxPacketSize: Int = 4096,
* JAVA API
*
* For producers of PUBLISH, the amount of time to wait to ack/receive a QoS 1/2 publish before retrying with
* the DUP flag set. Defaults to 15 seconds.
* the DUP flag set. Defaults to 0 seconds, which means republishing only occurs on reconnect.
*/
def withProducerPubAckRecTimeout(producerPubAckRecTimeout: Duration): MqttSessionSettings =
copy(producerPubAckRecTimeout = producerPubAckRecTimeout.asScala)

/**
* For producers of PUBLISH, the amount of time to wait for a server to complete a QoS 2 publish before retrying
* with another PUBREL. Defaults to 15 seconds.
* with another PUBREL. Defaults to 0 seconds, which means republishing only occurs on reconnect.
*/
def withProducerPubCompTimeout(producerPubCompTimeout: FiniteDuration): MqttSessionSettings =
copy(producerPubCompTimeout = producerPubCompTimeout)
Expand All @@ -130,7 +130,7 @@ final class MqttSessionSettings private (val maxPacketSize: Int = 4096,
* JAVA API
*
* For producers of PUBLISH, the amount of time to wait for a server to complete a QoS 2 publish before retrying
* with another PUBREL. Defaults to 15 seconds.
* with another PUBREL. Defaults to 0 seconds, which means republishing only occurs on reconnect.
*/
def withProducerPubCompTimeout(producerPubCompTimeout: Duration): MqttSessionSettings =
copy(producerPubCompTimeout = producerPubCompTimeout.asScala)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,10 @@ import scala.util.{Failure, Success}
)
)
} else {
data.activeProducers.values.foreach { producer =>
producer ! Producer.ReceiveConnect
}

serverConnect(
ConnectReceived(
connectionId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ import scala.util.{Failure, Success}
final case class PubRecReceivedFromRemote(local: Promise[ForwardPubRec]) extends Event
case object ReceivePubCompTimeout extends Event
final case class PubCompReceivedFromRemote(local: Promise[ForwardPubComp]) extends Event
case object ReceiveConnect extends Event

sealed abstract class Command
sealed abstract class ForwardPublishingCommand extends Command
Expand Down Expand Up @@ -116,7 +117,8 @@ import scala.util.{Failure, Success}
def publishUnacknowledged(data: Publishing)(implicit mat: Materializer): Behavior[Event] = Behaviors.withTimers {
val ReceivePubackrec = "producer-receive-pubackrec"
timer =>
timer.startSingleTimer(ReceivePubackrec, ReceivePubAckRecTimeout, data.settings.producerPubAckRecTimeout)
if (data.settings.producerPubAckRecTimeout.toNanos > 0L)
timer.startSingleTimer(ReceivePubackrec, ReceivePubAckRecTimeout, data.settings.producerPubAckRecTimeout)

Behaviors
.receiveMessagePartial[Event] {
Expand All @@ -129,7 +131,7 @@ import scala.util.{Failure, Success}
local.success(ForwardPubRec(data.publishData))
timer.cancel(ReceivePubackrec)
publishAcknowledged(data)
case ReceivePubAckRecTimeout =>
case ReceivePubAckRecTimeout | ReceiveConnect =>
data.remote.offer(
ForwardPublish(data.publish.copy(flags = data.publish.flags | ControlPacketFlags.DUP),
Some(data.packetId))
Expand All @@ -147,7 +149,8 @@ import scala.util.{Failure, Success}
def publishAcknowledged(data: Publishing)(implicit mat: Materializer): Behavior[Event] = Behaviors.withTimers {
val ReceivePubrel = "producer-receive-pubrel"
timer =>
timer.startSingleTimer(ReceivePubrel, ReceivePubCompTimeout, data.settings.producerPubCompTimeout)
if (data.settings.producerPubCompTimeout.toNanos > 0L)
timer.startSingleTimer(ReceivePubrel, ReceivePubCompTimeout, data.settings.producerPubCompTimeout)

data.remote.offer(ForwardPubRel(data.publish, data.packetId))

Expand All @@ -156,7 +159,7 @@ import scala.util.{Failure, Success}
case PubCompReceivedFromRemote(local) =>
local.success(ForwardPubComp(data.publishData))
Behaviors.stopped
case ReceivePubCompTimeout =>
case ReceivePubCompTimeout | ReceiveConnect =>
data.remote.offer(ForwardPubRel(data.publish, data.packetId))
publishAcknowledged(data)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,9 @@ import scala.util.{Failure, Success}
data.stash.foreach(context.self.tell)
timer.cancel(ReceiveConnAck)

data.activeProducers.values
.foreach(_ ! Producer.ReceiveConnect)

clientConnected(
ConnAckReplied(
data.connect,
Expand Down
69 changes: 69 additions & 0 deletions mqtt-streaming/src/test/scala/docs/scaladsl/MqttSessionSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,75 @@ class MqttSessionSpec
client.watchCompletion().foreach(_ => session.shutdown())
}

"publish with a QoS of 1 and cause a retry given a reconnect" in {
val session = ActorMqttClientSession(settings.withProducerPubAckRecTimeout(0.millis))

val server = TestProbe()
val pipeToServer = Flow[ByteString].mapAsync(1)(msg => server.ref.ask(msg).mapTo[ByteString])

val connect = Connect("some-client-id", ConnectFlags.None)
val connectBytes = connect.encode(ByteString.newBuilder).result()
val connAck = ConnAck(ConnAckFlags.None, ConnAckReturnCode.ConnectionAccepted)
val connAckBytes = connAck.encode(ByteString.newBuilder).result()

val publish = Publish("some-topic", ByteString("some-payload"))
val publishBytes = publish.encode(ByteString.newBuilder, Some(PacketId(1))).result()
val publishDup = publish.copy(flags = publish.flags | ControlPacketFlags.DUP)
val publishDupBytes = publishDup.encode(ByteString.newBuilder, Some(PacketId(1))).result()
val pubAck = PubAck(PacketId(1))
val pubAckBytes = pubAck.encode(ByteString.newBuilder).result()

val firstClient =
Source
.queue(1, OverflowStrategy.fail)
.via(
Mqtt
.clientSessionFlow(session, ByteString("1"))
.join(pipeToServer)
)
.toMat(Sink.ignore)(Keep.left)
.run()

firstClient.offer(Command(connect))

server.expectMsg(connectBytes)
server.reply(connAckBytes)

session ! Command(publish)

server.expectMsg(publishBytes)

server.reply(connAckBytes)

firstClient.complete()

val secondClient =
Source
.queue(1, OverflowStrategy.fail)
.via(
Mqtt
.clientSessionFlow(session, ByteString("2"))
.join(pipeToServer)
)
.toMat(Sink.ignore)(Keep.left)
.run()

secondClient.offer(Command(connect))

server.expectMsg(connectBytes)
server.reply(connAckBytes)

server.expectMsg(publishDupBytes)
server.reply(pubAckBytes)

secondClient.complete()

for {
_ <- firstClient.watchCompletion()
_ <- secondClient.watchCompletion()
} yield session.shutdown()
}

"publish with QoS 2 and carry through an object to pubComp" in assertAllStagesStopped {
val session = ActorMqttClientSession(settings)

Expand Down

0 comments on commit b1cbeb8

Please sign in to comment.