Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mqtt-streaming: Various fixes #1595

Merged
merged 5 commits into from
Mar 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2016-2018 Lightbend Inc. <http://www.lightbend.com>
* Copyright (C) 2016-2019 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.mqtt
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2016-2018 Lightbend Inc. <http://www.lightbend.com>
* Copyright (C) 2016-2019 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.mqtt.streaming
Expand Down Expand Up @@ -107,7 +107,7 @@ class MqttPerf {
.fromGraph(clientSource)
.via(
Mqtt
.clientSessionFlow(clientSession)
.clientSessionFlow(clientSession, ByteString("1"))
.join(Tcp().outgoingConnection(host, port))
)
.wireTap(Sink.foreach[Either[DecodeError, Event[_]]] {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,10 @@
# Allow changes to impl
ProblemFilters.exclude[Problem]("akka.stream.alpakka.mqtt.streaming.impl.*")
# PR #1595
# https://github.com/akka/alpakka/pull/1595
# private[streaming]
ProblemFilters.exclude[MissingMethodProblem]("akka.stream.alpakka.mqtt.streaming.scaladsl.MqttClientSession.commandFlow")
ProblemFilters.exclude[MissingMethodProblem]("akka.stream.alpakka.mqtt.streaming.scaladsl.ActorMqttClientSession.commandFlow")
# private[streaming]
ProblemFilters.exclude[MissingMethodProblem]("akka.stream.alpakka.mqtt.streaming.scaladsl.MqttClientSession.eventFlow")
ProblemFilters.exclude[MissingMethodProblem]("akka.stream.alpakka.mqtt.streaming.scaladsl.ActorMqttClientSession.eventFlow")
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ final class MqttSessionSettings private (val maxPacketSize: Int = 4096,
val eventParallelism: Int = 10,
val receiveConnectTimeout: FiniteDuration = 5.minutes,
val receiveConnAckTimeout: FiniteDuration = 30.seconds,
val producerPubAckRecTimeout: FiniteDuration = 15.seconds,
val producerPubCompTimeout: FiniteDuration = 15.seconds,
val producerPubAckRecTimeout: FiniteDuration = 0.seconds,
val producerPubCompTimeout: FiniteDuration = 0.seconds,
val consumerPubAckRecTimeout: FiniteDuration = 30.seconds,
val consumerPubCompTimeout: FiniteDuration = 30.seconds,
val consumerPubRelTimeout: FiniteDuration = 30.seconds,
Expand Down Expand Up @@ -105,7 +105,7 @@ final class MqttSessionSettings private (val maxPacketSize: Int = 4096,

/**
* For producers of PUBLISH, the amount of time to wait to ack/receive a QoS 1/2 publish before retrying with
* the DUP flag set. Defaults to 15 seconds.
* the DUP flag set. Defaults to 0 seconds, which means republishing only occurs on reconnect.
*/
def withProducerPubAckRecTimeout(producerPubAckRecTimeout: FiniteDuration): MqttSessionSettings =
copy(producerPubAckRecTimeout = producerPubAckRecTimeout)
Expand All @@ -114,14 +114,14 @@ final class MqttSessionSettings private (val maxPacketSize: Int = 4096,
* JAVA API
*
* For producers of PUBLISH, the amount of time to wait to ack/receive a QoS 1/2 publish before retrying with
* the DUP flag set. Defaults to 15 seconds.
* the DUP flag set. Defaults to 0 seconds, which means republishing only occurs on reconnect.
*/
def withProducerPubAckRecTimeout(producerPubAckRecTimeout: Duration): MqttSessionSettings =
copy(producerPubAckRecTimeout = producerPubAckRecTimeout.asScala)

/**
* For producers of PUBLISH, the amount of time to wait for a server to complete a QoS 2 publish before retrying
* with another PUBREL. Defaults to 15 seconds.
* with another PUBREL. Defaults to 0 seconds, which means republishing only occurs on reconnect.
*/
def withProducerPubCompTimeout(producerPubCompTimeout: FiniteDuration): MqttSessionSettings =
copy(producerPubCompTimeout = producerPubCompTimeout)
Expand All @@ -130,7 +130,7 @@ final class MqttSessionSettings private (val maxPacketSize: Int = 4096,
* JAVA API
*
* For producers of PUBLISH, the amount of time to wait for a server to complete a QoS 2 publish before retrying
* with another PUBREL. Defaults to 15 seconds.
* with another PUBREL. Defaults to 0 seconds, which means republishing only occurs on reconnect.
*/
def withProducerPubCompTimeout(producerPubCompTimeout: Duration): MqttSessionSettings =
copy(producerPubCompTimeout = producerPubCompTimeout.asScala)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import akka.actor.typed.{ActorRef, Behavior, ChildFailed, PostStop, Terminated}
import akka.actor.typed.scaladsl.{ActorContext, Behaviors}
import akka.annotation.InternalApi
import akka.stream.{Materializer, OverflowStrategy}
import akka.stream.scaladsl.{BroadcastHub, Keep, Sink, Source, SourceQueueWithComplete}
import akka.stream.scaladsl.{BroadcastHub, Keep, Source, SourceQueueWithComplete}
import akka.util.ByteString

import scala.concurrent.Promise
import scala.concurrent.duration.FiniteDuration
Expand Down Expand Up @@ -98,6 +99,7 @@ import scala.util.{Failure, Success}
settings
)
final case class ConnectReceived(
connectionId: ByteString,
connect: Connect,
connectData: ConnectData,
remote: SourceQueueWithComplete[ForwardConnectCommand],
Expand All @@ -124,6 +126,7 @@ import scala.util.{Failure, Success}
settings
)
final case class ConnAckReceived(
connectionId: ByteString,
connectFlags: ConnectFlags,
keepAlive: FiniteDuration,
pendingPingResp: Boolean,
Expand Down Expand Up @@ -151,32 +154,58 @@ import scala.util.{Failure, Success}
settings
)

sealed abstract class Event
final case class ConnectReceivedLocally(connect: Connect,
sealed abstract class Event(val connectionId: ByteString)

final case class ConnectReceivedLocally(override val connectionId: ByteString,
connect: Connect,
connectData: ConnectData,
remote: Promise[Source[ForwardConnectCommand, NotUsed]])
extends Event
final case class ConnAckReceivedFromRemote(connAck: ConnAck, local: Promise[ForwardConnAck]) extends Event
case object ReceiveConnAckTimeout extends Event
case object ConnectionLost extends Event
final case class DisconnectReceivedLocally(remote: Promise[ForwardDisconnect.type]) extends Event
final case class SubscribeReceivedLocally(subscribe: Subscribe,
extends Event(connectionId)
final case class ConnAckReceivedFromRemote(override val connectionId: ByteString,
connAck: ConnAck,
local: Promise[ForwardConnAck])
extends Event(connectionId)

case class ReceiveConnAckTimeout(override val connectionId: ByteString) extends Event(connectionId)

case class ConnectionLost(override val connectionId: ByteString) extends Event(connectionId)

final case class DisconnectReceivedLocally(override val connectionId: ByteString,
remote: Promise[ForwardDisconnect.type])
extends Event(connectionId)

final case class SubscribeReceivedLocally(override val connectionId: ByteString,
subscribe: Subscribe,
subscribeData: Subscriber.SubscribeData,
remote: Promise[Subscriber.ForwardSubscribe])
extends Event
final case class PublishReceivedFromRemote(publish: Publish, local: Promise[Consumer.ForwardPublish.type])
extends Event
final case class ConsumerFree(topicName: String) extends Event
final case class PublishReceivedLocally(publish: Publish, publishData: Producer.PublishData) extends Event
final case class ProducerFree(topicName: String) extends Event
case object SendPingReqTimeout extends Event
final case class PingRespReceivedFromRemote(local: Promise[ForwardPingResp.type]) extends Event
final case class ReceivedProducerPublishingCommand(command: Source[Producer.ForwardPublishingCommand, NotUsed])
extends Event
final case class UnsubscribeReceivedLocally(unsubscribe: Unsubscribe,
extends Event(connectionId)

final case class PublishReceivedFromRemote(override val connectionId: ByteString,
publish: Publish,
local: Promise[Consumer.ForwardPublish.type])
extends Event(connectionId)

final case class ConsumerFree(topicName: String) extends Event(ByteString.empty)

final case class PublishReceivedLocally(publish: Publish, publishData: Producer.PublishData)
extends Event(ByteString.empty)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These and related aren't associated with a connection id, so their connection id is an empty ByteString. This is similar to how the server does things.


final case class ProducerFree(topicName: String) extends Event(ByteString.empty)

case class SendPingReqTimeout(override val connectionId: ByteString) extends Event(connectionId)

final case class PingRespReceivedFromRemote(override val connectionId: ByteString,
local: Promise[ForwardPingResp.type])
extends Event(connectionId)

final case class ReceivedProducerPublishingCommand(command: Producer.ForwardPublishingCommand)
extends Event(ByteString.empty)

final case class UnsubscribeReceivedLocally(override val connectionId: ByteString,
unsubscribe: Unsubscribe,
unsubscribeData: Unsubscriber.UnsubscribeData,
remote: Promise[Unsubscriber.ForwardUnsubscribe])
extends Event
extends Event(connectionId)

sealed abstract class Command
sealed abstract class ForwardConnectCommand
Expand All @@ -196,11 +225,12 @@ import scala.util.{Failure, Success}
def disconnected(data: Disconnected)(implicit mat: Materializer): Behavior[Event] =
Behaviors
.receivePartial[Event] {
case (context, ConnectReceivedLocally(connect, connectData, remote)) =>
case (context, ConnectReceivedLocally(connectionId, connect, connectData, remote)) =>
val (queue, source) = Source
.queue[ForwardConnectCommand](1, OverflowStrategy.dropHead)
.toMat(BroadcastHub.sink)(Keep.both)
.run()

remote.success(source)

queue.offer(ForwardConnect)
Expand All @@ -210,6 +240,7 @@ import scala.util.{Failure, Success}
context.children.foreach(context.stop)
serverConnect(
ConnectReceived(
connectionId,
connect,
connectData,
queue,
Expand All @@ -226,8 +257,13 @@ import scala.util.{Failure, Success}
)
)
} else {
data.activeProducers.values.foreach { producer =>
producer ! Producer.ReceiveConnect
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Main change on the client to let the producers know about new ReceiveConnect so that they can republish.


serverConnect(
ConnectReceived(
connectionId,
connect,
connectData,
queue,
Expand All @@ -245,7 +281,7 @@ import scala.util.{Failure, Success}
)

}
case (_, ConnectionLost) =>
case (_, ConnectionLost(_)) =>
Behavior.same
case (_, e) =>
disconnected(data.copy(stash = data.stash :+ e))
Expand Down Expand Up @@ -283,17 +319,25 @@ import scala.util.{Failure, Success}

timer =>
if (!timer.isTimerActive(ReceiveConnAck))
timer.startSingleTimer(ReceiveConnAck, ReceiveConnAckTimeout, data.settings.receiveConnAckTimeout)

timer.startSingleTimer(ReceiveConnAck,
ReceiveConnAckTimeout(data.connectionId),
data.settings.receiveConnAckTimeout)
Behaviors
.receivePartial[Event] {
case (context, ConnAckReceivedFromRemote(connAck, local))
case (context, connect @ ConnectReceivedLocally(connectionId, _, _, _))
if connectionId != data.connectionId =>
context.self ! connect
disconnect(context, data.remote, data)
case (_, event) if event.connectionId.nonEmpty && event.connectionId != data.connectionId =>
Behaviors.same
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case is responsible for letting empty (thus session bound) or matching connection ids fall through to other cases.

case (context, ConnAckReceivedFromRemote(_, connAck, local))
if connAck.returnCode.contains(ConnAckReturnCode.ConnectionAccepted) =>
local.success(ForwardConnAck(data.connectData))
data.stash.foreach(context.self.tell)
timer.cancel(ReceiveConnAck)
serverConnected(
ConnAckReceived(
data.connectionId,
data.connect.connectFlags,
data.connect.keepAlive,
pendingPingResp = false,
Expand All @@ -310,15 +354,15 @@ import scala.util.{Failure, Success}
data.settings
)
)
case (context, ConnAckReceivedFromRemote(_, local)) =>
case (context, ConnAckReceivedFromRemote(_, _, local)) =>
local.success(ForwardConnAck(data.connectData))
timer.cancel(ReceiveConnAck)
disconnect(context, data.remote, data)
case (context, ReceiveConnAckTimeout) =>
case (context, ReceiveConnAckTimeout(_)) =>
data.remote.fail(ConnectFailed)
timer.cancel(ReceiveConnAck)
disconnect(context, data.remote, data)
case (context, ConnectionLost) =>
case (context, ConnectionLost(_)) =>
timer.cancel(ReceiveConnAck)
disconnect(context, data.remote, data)
case (_, e) =>
Expand All @@ -339,33 +383,40 @@ import scala.util.{Failure, Success}
Behaviors.withTimers { timer =>
val SendPingreq = "send-pingreq"
if (resetPingReqTimer && data.keepAlive.toMillis > 0)
timer.startSingleTimer(SendPingreq, SendPingReqTimeout, data.keepAlive)
timer.startSingleTimer(SendPingreq, SendPingReqTimeout(data.connectionId), data.keepAlive)

Behaviors
.receivePartial[Event] {
case (context, ConnectionLost) =>
case (context, connect @ ConnectReceivedLocally(connectionId, _, _, _))
if connectionId != data.connectionId =>
context.self ! connect
disconnect(context, data.remote, data)
case (_, event) if event.connectionId.nonEmpty && event.connectionId != data.connectionId =>
Behaviors.same
case (context, ConnectionLost(_)) =>
timer.cancel(SendPingreq)
disconnect(context, data.remote, data)
case (context, DisconnectReceivedLocally(remote)) =>
case (context, DisconnectReceivedLocally(_, remote)) =>
remote.success(ForwardDisconnect)
timer.cancel(SendPingreq)
disconnect(context, data.remote, data)
case (context, SubscribeReceivedLocally(_, subscribeData, remote)) =>
case (context, SubscribeReceivedLocally(_, _, subscribeData, remote)) =>
context.watch(
context.spawnAnonymous(Subscriber(subscribeData, remote, data.subscriberPacketRouter, data.settings))
)
serverConnected(data)
case (context, UnsubscribeReceivedLocally(_, unsubscribeData, remote)) =>
case (context, UnsubscribeReceivedLocally(_, _, unsubscribeData, remote)) =>
context.watch(
context
.spawnAnonymous(Unsubscriber(unsubscribeData, remote, data.unsubscriberPacketRouter, data.settings))
)
serverConnected(data)
case (_, PublishReceivedFromRemote(publish, local))
case (_, PublishReceivedFromRemote(_, publish, local))
if (publish.flags & ControlPacketFlags.QoSReserved).underlying == 0 =>
local.success(Consumer.ForwardPublish)
serverConnected(data, resetPingReqTimer = false)
case (context, prfr @ PublishReceivedFromRemote(publish @ Publish(_, topicName, Some(packetId), _), local)) =>
case (context,
prfr @ PublishReceivedFromRemote(_, publish @ Publish(_, topicName, Some(packetId), _), local)) =>
data.activeConsumers.get(topicName) match {
case None =>
val consumerName = ActorName.mkName(ConsumerNamePrefix + topicName + "-" + context.children.size)
Expand Down Expand Up @@ -420,8 +471,11 @@ import scala.util.{Failure, Success}
val producerName = ActorName.mkName(ProducerNamePrefix + publish.topicName + "-" + context.children.size)
if (!data.activeProducers.contains(publish.topicName)) {
val reply = Promise[Source[Producer.ForwardPublishingCommand, NotUsed]]
import context.executionContext
reply.future.foreach(command => context.self ! ReceivedProducerPublishingCommand(command))

Source
.fromFutureSource(reply.future)
.runForeach(msg => context.self ! ReceivedProducerPublishingCommand(msg))

val producer =
context.spawn(Producer(publish, publishData, reply, data.producerPacketRouter, data.settings),
producerName)
Expand All @@ -441,8 +495,11 @@ import scala.util.{Failure, Success}
val prl = data.pendingLocalPublications(i)._2
val producerName = ActorName.mkName(ProducerNamePrefix + topicName + "-" + context.children.size)
val reply = Promise[Source[Producer.ForwardPublishingCommand, NotUsed]]
import context.executionContext
reply.future.foreach(command => context.self ! ReceivedProducerPublishingCommand(command))

Source
.fromFutureSource(reply.future)
.runForeach(msg => context.self ! ReceivedProducerPublishingCommand(msg))

val producer = context.spawn(
Producer(prl.publish, prl.publishData, reply, data.producerPacketRouter, data.settings),
producerName
Expand All @@ -461,20 +518,20 @@ import scala.util.{Failure, Success}
} else {
serverConnected(data.copy(activeProducers = data.activeProducers - topicName))
}
case (_, ReceivedProducerPublishingCommand(command)) =>
command.runWith(Sink.foreach {
case Producer.ForwardPublish(publish, packetId) => data.remote.offer(ForwardPublish(publish, packetId))
case Producer.ForwardPubRel(_, packetId) => data.remote.offer(ForwardPubRel(packetId))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Closing over old state -- if the client ever reconnected, data.remote would be from an old connection and thus never actually republished.

})
case (_, ReceivedProducerPublishingCommand(Producer.ForwardPublish(publish, packetId))) =>
data.remote.offer(ForwardPublish(publish, packetId))
Behaviors.same
case (_, ReceivedProducerPublishingCommand(Producer.ForwardPubRel(_, packetId))) =>
data.remote.offer(ForwardPubRel(packetId))
Behaviors.same
case (context, SendPingReqTimeout) if data.pendingPingResp =>
case (context, SendPingReqTimeout(_)) if data.pendingPingResp =>
data.remote.fail(PingFailed)
timer.cancel(SendPingreq)
disconnect(context, data.remote, data)
case (_, SendPingReqTimeout) =>
case (_, SendPingReqTimeout(_)) =>
data.remote.offer(ForwardPingReq)
serverConnected(data.copy(pendingPingResp = true))
case (_, PingRespReceivedFromRemote(local)) =>
case (_, PingRespReceivedFromRemote(_, local)) =>
local.success(ForwardPingResp)
serverConnected(data.copy(pendingPingResp = false))
}
Expand Down
Loading