Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ack MqttMessage in Sink Flow #1747

Merged
merged 29 commits into from
Jun 20, 2019
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
e529c83
add ack after publish
nsandroni Jun 6, 2019
a0a2efe
rollback mqttflowstage and refactor with ack
nsandroni Jun 10, 2019
8de1b99
test added
nsandroni Jun 11, 2019
e1ebeec
format code
nsandroni Jun 11, 2019
c7cc03f
await for result
nsandroni Jun 11, 2019
06392df
unig testsink & testsource
nsandroni Jun 12, 2019
28ec954
closing run-flow for paradox
nsandroni Jun 12, 2019
18f286f
Try for prevent failure on await
nsandroni Jun 12, 2019
1de184d
rebuild
nsandroni Jun 12, 2019
5573dcd
rebuild
nsandroni Jun 12, 2019
917f77a
rebuild
nsandroni Jun 12, 2019
b4fc990
using inheritance from MqttFlowStage
nsandroni Jun 12, 2019
1bd54ae
add mima filters
nsandroni Jun 13, 2019
469f8ed
moving mima in the right folder
nsandroni Jun 13, 2019
a322fb5
mima filter, using abstract flowstage
nsandroni Jun 14, 2019
5dfb9f0
using same config for access to mqtt
nsandroni Jun 14, 2019
dfab610
remove useless method from MqttSource, removing marker from test, add…
nsandroni Jun 14, 2019
38e68af
format and remove tag
nsandroni Jun 14, 2019
6f3bea2
random topic and sleep for ack
nsandroni Jun 17, 2019
0e518f8
scala documentation for MqttFlow.atLeastOnceWithAck
nsandroni Jun 17, 2019
fa4350d
abstract class MqttMessageWithAckImpl for javadsl, add documentation …
nsandroni Jun 18, 2019
fa1eeb0
add eventually on test scala
nsandroni Jun 18, 2019
97387ad
indent
nsandroni Jun 18, 2019
018cc27
add java test
nsandroni Jun 18, 2019
d8f0644
import snipplet on docs, test are now with fixed topic ( from acl fil…
nsandroni Jun 19, 2019
bdb7978
Update docs/src/main/paradox/mqtt.md
nsandroni Jun 20, 2019
9f88618
Update docs/src/main/paradox/mqtt.md
nsandroni Jun 20, 2019
3e1e95a
Update docs/src/main/paradox/mqtt.md
nsandroni Jun 20, 2019
780cf73
Update docs/src/main/paradox/mqtt.md
nsandroni Jun 20, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,305 @@
/*
* Copyright (C) 2016-2019 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.mqtt.impl

import java.util.Properties
import java.util.concurrent.Semaphore
import java.util.concurrent.atomic.AtomicInteger

import akka.Done
import akka.annotation.InternalApi
import akka.stream._
import akka.stream.alpakka.mqtt._
import akka.stream.alpakka.mqtt.scaladsl.MqttMessageWithAck
import akka.stream.stage._
import akka.util.ByteString
import org.eclipse.paho.client.mqttv3.{
IMqttActionListener,
IMqttAsyncClient,
IMqttDeliveryToken,
IMqttToken,
MqttAsyncClient,
MqttCallbackExtended,
MqttConnectOptions,
MqttException,
MqttMessage => PahoMqttMessage
}

import scala.collection.mutable
import scala.concurrent.{Future, Promise}
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}
import org.eclipse.paho.client.mqttv3.DisconnectedBufferOptions
import akka.stream.alpakka.mqtt.MqttOfflinePersistenceSettings

/**
* INTERNAL API
*/

@InternalApi
private[mqtt] final class MqttFlowStageWithAck(connectionSettings: MqttConnectionSettings,
subscriptions: Map[String, MqttQoS],
bufferSize: Int,
defaultQoS: MqttQoS,
manualAcks: Boolean = false)
extends GraphStageWithMaterializedValue[FlowShape[MqttMessageWithAck, MqttMessageWithAck], Future[Done]] {
import MqttFlowStageWithAck._

private val in = Inlet[MqttMessageWithAck]("MqttFlow.in")
private val out = Outlet[MqttMessageWithAck]("MqttFlow.out")
override val shape: Shape = FlowShape(in, out)
override protected def initialAttributes: Attributes = Attributes.name("MqttFlow")

override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = {
val subscriptionPromise = Promise[Done]
val logic = new GraphStageLogic(shape) with StageLogging with InHandler with OutHandler {
private val backpressurePahoClient = new Semaphore(bufferSize)
private var pendingMsg = Option.empty[MqttMessageWithAck]
private val queue = mutable.Queue[MqttMessageWithAck]()
private val messagesToAck: mutable.HashMap[Int, MqttMessageWithAck] = mutable.HashMap()
private val unackedMessages = new AtomicInteger()

private val onSubscribe: AsyncCallback[Try[IMqttToken]] = getAsyncCallback[Try[IMqttToken]] { conn =>
subscriptionPromise.complete(conn.map(_ => {
log.debug("subscription established")
Done
}))
pull(in)
}

private val onConnect: AsyncCallback[IMqttAsyncClient] =
getAsyncCallback[IMqttAsyncClient]((client: IMqttAsyncClient) => {
log.debug("connected")
if (subscriptions.nonEmpty) {
if (manualAcks) client.setManualAcks(true)
val (topics, qoses) = subscriptions.unzip
client.subscribe(topics.toArray, qoses.map(_.value).toArray, (), asActionListener(onSubscribe.invoke))
} else {
subscriptionPromise.complete(SuccessfullyDone)
pull(in)
}
})

private val onConnectionLost: AsyncCallback[Throwable] = getAsyncCallback[Throwable](failStageWith)

private val onMessageAsyncCallback: AsyncCallback[MqttMessageWithAck] =
getAsyncCallback[MqttMessageWithAck] { message =>
if (isAvailable(out)) {
pushDownstream(message)
} else if (queue.size + 1 > bufferSize) {
failStageWith(new RuntimeException(s"Reached maximum buffer size $bufferSize"))
} else {
queue.enqueue(message)
}
}

private val onPublished: AsyncCallback[Try[IMqttToken]] = getAsyncCallback[Try[IMqttToken]] {
case Success(_) => if (!hasBeenPulled(in)) pull(in)
case Failure(ex) => failStageWith(ex)
}

private def createPahoBufferOptions(settings: MqttOfflinePersistenceSettings): DisconnectedBufferOptions = {

val disconnectedBufferOptions = new DisconnectedBufferOptions()

disconnectedBufferOptions.setBufferEnabled(true)
disconnectedBufferOptions.setBufferSize(settings.bufferSize)
disconnectedBufferOptions.setDeleteOldestMessages(settings.deleteOldestMessage)
disconnectedBufferOptions.setPersistBuffer(settings.persistBuffer)

disconnectedBufferOptions
}

private val client = new MqttAsyncClient(
connectionSettings.broker,
connectionSettings.clientId,
connectionSettings.persistence
)

private def mqttClient =
connectionSettings.offlinePersistenceSettings match {
case Some(bufferOpts) =>
client.setBufferOpts(createPahoBufferOptions(bufferOpts))

client
case _ => client
}

private val commitCallback: AsyncCallback[CommitCallbackArguments] =
getAsyncCallback[CommitCallbackArguments](
(args: CommitCallbackArguments) =>
try {
mqttClient.messageArrivedComplete(args.messageId, args.qos.value)
if (unackedMessages.decrementAndGet() == 0 && (isClosed(out) || (isClosed(in) && queue.isEmpty)))
completeStage()
args.promise.complete(SuccessfullyDone)
} catch {
case e: Throwable => args.promise.failure(e)
}
)

mqttClient.setCallback(new MqttCallbackExtended {
override def messageArrived(topic: String, pahoMessage: PahoMqttMessage): Unit = {
backpressurePahoClient.acquire()
val message = new MqttMessageWithAck {
override val message = MqttMessage(topic, ByteString.fromArrayUnsafe(pahoMessage.getPayload))

override def ack(): Future[Done] = {
val promise = Promise[Done]()
val qos = pahoMessage.getQos match {
case 0 => MqttQoS.AtMostOnce
case 1 => MqttQoS.AtLeastOnce
case 2 => MqttQoS.ExactlyOnce
}
commitCallback.invoke(CommitCallbackArguments(pahoMessage.getId, qos, promise))
promise.future
}
}
onMessageAsyncCallback.invoke(message)
}

override def deliveryComplete(token: IMqttDeliveryToken): Unit =
if (messagesToAck.isDefinedAt(token.getMessageId)) {
messagesToAck(token.getMessageId).ack()
messagesToAck.remove(token.getMessageId)
}

override def connectionLost(cause: Throwable): Unit =
if (!connectionSettings.automaticReconnect) {
log.info("connection lost (you might want to enable `automaticReconnect` in `MqttConnectionSettings`)")
onConnectionLost.invoke(cause)
} else {
log.info("connection lost, trying to reconnect")
}

override def connectComplete(reconnect: Boolean, serverURI: String): Unit = {
pendingMsg.foreach { msg: MqttMessageWithAck =>
publishToMqtt(msg)
}
if (reconnect && !hasBeenPulled(in)) pull(in)
}
})

// InHandler
override def onPush(): Unit = {
val msg = grab(in)
try {
publishToMqtt(msg)
} catch {
case _: MqttException if connectionSettings.automaticReconnect => pendingMsg = Some(msg)
case NonFatal(e) => throw e
}
}

override def onUpstreamFinish(): Unit = {
setKeepGoing(true)
if (queue.isEmpty && unackedMessages.get() == 0) super.onUpstreamFinish()
}

override def onUpstreamFailure(ex: Throwable): Unit = {
setKeepGoing(true)
if (queue.isEmpty && unackedMessages.get() == 0) super.onUpstreamFailure(ex)
}

// OutHandler
override def onPull(): Unit =
if (queue.nonEmpty) {
pushDownstream(queue.dequeue())
if (unackedMessages.get() == 0 && isClosed(in)) completeStage()
}

override def onDownstreamFinish(): Unit = {
setKeepGoing(true)
if (unackedMessages.get() == 0) super.onDownstreamFinish()
}

setHandlers(in, out, this)

private def publishToMqtt(msg: MqttMessage): IMqttDeliveryToken = {
val pahoMsg = new PahoMqttMessage(msg.payload.toArray)
pahoMsg.setQos(msg.qos.getOrElse(defaultQoS).value)
pahoMsg.setRetained(msg.retained)
mqttClient.publish(msg.topic, pahoMsg, msg, asActionListener(onPublished.invoke))
}

private def publishToMqtt(msg: MqttMessageWithAck): Unit = {
val publish = publishToMqtt(msg.message)
messagesToAck ++= mutable.HashMap(publish.getMessageId -> msg)
}

private def pushDownstream(message: MqttMessageWithAck): Unit = {
push(out, message)
backpressurePahoClient.release()
if (manualAcks) unackedMessages.incrementAndGet()
}

private def failStageWith(ex: Throwable): Unit = {
subscriptionPromise.tryFailure(ex)
failStage(ex)
}

override def preStart(): Unit =
try {
mqttClient.connect(
asConnectOptions(connectionSettings),
(),
new IMqttActionListener {
override def onSuccess(v: IMqttToken): Unit = onConnect.invoke(v.getClient)
override def onFailure(asyncActionToken: IMqttToken, ex: Throwable): Unit = onConnectionLost.invoke(ex)
}
)
} catch {
case e: Throwable => failStageWith(e)
}

override def postStop(): Unit = {
if (!subscriptionPromise.isCompleted)
subscriptionPromise
.tryFailure(
new IllegalStateException("Cannot complete subscription because the stage is about to stop or fail")
)

try {
log.debug("stage stopped, disconnecting")
mqttClient.disconnect(
connectionSettings.disconnectQuiesceTimeout.toMillis,
null,
new IMqttActionListener {
override def onSuccess(asyncActionToken: IMqttToken): Unit = mqttClient.close()

override def onFailure(asyncActionToken: IMqttToken, exception: Throwable): Unit = {
// Use 0 quiesce timeout as we have already quiesced in `disconnect`
mqttClient.disconnectForcibly(0, connectionSettings.disconnectTimeout.toMillis)
// Only disconnected client can be closed
mqttClient.close()
}
}
)
} catch {
// Not to worry - disconnect is best effort - don't worry if already disconnected
case _: MqttException =>
try {
mqttClient.close()
} catch {
case _: MqttException =>
}
}
}
}
(logic, subscriptionPromise.future)
}
}

/**
* INTERNAL API
*/
@InternalApi
private[mqtt] object MqttFlowStageWithAck {
private val SuccessfullyDone = Success(Done)
final private case class CommitCallbackArguments(messageId: Int, qos: MqttQoS, promise: Promise[Done])
def asConnectOptions(connectionSettings: MqttConnectionSettings): MqttConnectOptions =
MqttFlowStage.asConnectOptions(connectionSettings)
def asActionListener(func: Try[IMqttToken] => Unit): IMqttActionListener = MqttFlowStage.asActionListener(func)
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package akka.stream.alpakka.mqtt.scaladsl

import akka.Done
import akka.stream.alpakka.mqtt._
import akka.stream.alpakka.mqtt.impl.MqttFlowStage
import akka.stream.alpakka.mqtt.impl.{MqttFlowStage, MqttFlowStageWithAck}
import akka.stream.scaladsl.Flow

import scala.concurrent.Future
Expand Down Expand Up @@ -106,4 +106,24 @@ object MqttFlow {
Flow.fromGraph(
new MqttFlowStage(connectionSettings, subscriptions.subscriptions, bufferSize, defaultQos, manualAcks = true)
)

/**
* Create a flow to send messages to MQTT AND subscribe to MQTT messages with a commit handle to acknowledge message reception.
* The acknowledge are fired in both messages
* The materialized value completes on successful connection to the MQTT broker.
*
* @param bufferSize max number of messages read from MQTT before back-pressure applies
* @param defaultQos Quality of service level applied for messages not specifying a message specific value
*/
def atLeastOnceWithAck(connectionSettings: MqttConnectionSettings,
subscriptions: MqttSubscriptions,
bufferSize: Int,
defaultQos: MqttQoS): Flow[MqttMessageWithAck, MqttMessageWithAck, Future[Done]] =
Flow.fromGraph(
nsandroni marked this conversation as resolved.
Show resolved Hide resolved
new MqttFlowStageWithAck(connectionSettings,
subscriptions.subscriptions,
bufferSize,
defaultQos,
manualAcks = true)
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,17 @@ object MqttSource {
MqttFlow.atLeastOnce(settings, subscriptions, bufferSize, defaultQos = MqttQoS.AtLeastOnce)
)(Keep.right)

/**
* Create a source subscribing to MQTT messages with a commit handle to acknowledge message reception and message to send.
*
* The materialized value completes on successful connection to the MQTT broker.
*
* @param bufferSize max number of messages read from MQTT before back-pressure applies
*/
nsandroni marked this conversation as resolved.
Show resolved Hide resolved
def atLeastOnceWithAck(settings: MqttConnectionSettings,
subscriptions: MqttSubscriptions,
bufferSize: Int): Source[MqttMessageWithAck, Future[Done]] =
Source.maybe.viaMat(
MqttFlow.atLeastOnceWithAck(settings, subscriptions, bufferSize, defaultQos = MqttQoS.AtLeastOnce)
)(Keep.right)
}
Loading