diff --git a/.scalafmt.conf b/.scalafmt.conf index 21fda04f76..477b081967 100644 --- a/.scalafmt.conf +++ b/.scalafmt.conf @@ -6,6 +6,5 @@ docstrings = JavaDoc indentOperator = spray maxColumn = 120 rewrite.rules = [RedundantBraces, RedundantParens, SortImports] -spaces.inImportCurlyBraces = true unindentTopLevelOperators = true binPack.callSite = true diff --git a/mqtt/src/main/scala/akka/stream/alpakka/mqtt/Mqtt.scala b/mqtt/src/main/scala/akka/stream/alpakka/mqtt/Mqtt.scala index d34018a3ad..0c76e64e88 100644 --- a/mqtt/src/main/scala/akka/stream/alpakka/mqtt/Mqtt.scala +++ b/mqtt/src/main/scala/akka/stream/alpakka/mqtt/Mqtt.scala @@ -73,11 +73,22 @@ final case class MqttConnectionSettings( broker: String, clientId: String, persistence: MqttClientPersistence, - auth: Option[(String, String)] = None + auth: Option[(String, String)] = None, + cleanSession: Boolean = true, + will: Option[Will] = None ) { + def withBroker(broker: String) = + copy(broker = broker) + def withAuth(username: String, password: String) = copy(auth = Some((username, password))) + def withCleanSession(cleanSession: Boolean) = + copy(cleanSession = cleanSession) + + def withWill(will: Will) = + copy(will = Some(will)) + def withClientId(clientId: String) = copy(clientId = clientId) } @@ -93,6 +104,8 @@ object MqttConnectionSettings { final case class MqttMessage(topic: String, payload: ByteString) +final case class Will(message: MqttMessage, qos: MqttQoS, retained: Boolean) + object MqttMessage { /** @@ -146,6 +159,10 @@ private[mqtt] trait MqttConnectorLogic { this: GraphStageLogic => connectOptions.setUserName(user) connectOptions.setPassword(password.toCharArray) } + connectionSettings.will.foreach { will => + connectOptions.setWill(will.message.topic, will.message.payload.toArray, will.qos.byteValue.toInt, will.retained) + } + connectOptions.setCleanSession(connectionSettings.cleanSession) client.connect(connectOptions, (), connectHandler) } diff --git a/mqtt/src/test/scala/akka/stream/alpakka/mqtt/scaladsl/MqttSourceSpec.scala b/mqtt/src/test/scala/akka/stream/alpakka/mqtt/scaladsl/MqttSourceSpec.scala index c84e2a71cc..c9776d2a77 100644 --- a/mqtt/src/test/scala/akka/stream/alpakka/mqtt/scaladsl/MqttSourceSpec.scala +++ b/mqtt/src/test/scala/akka/stream/alpakka/mqtt/scaladsl/MqttSourceSpec.scala @@ -5,7 +5,7 @@ package akka.stream.alpakka.mqtt.scaladsl import akka.actor.ActorSystem import akka.stream._ -import akka.stream.alpakka.mqtt.{ MqttConnectionSettings, MqttMessage, MqttQoS, MqttSourceSettings } +import akka.stream.alpakka.mqtt._ import akka.stream.scaladsl._ import akka.stream.testkit.scaladsl.TestSink import akka.testkit.TestKit @@ -41,6 +41,7 @@ class MqttSourceSpec val topic1 = "source-spec/topic1" val topic2 = "source-spec/topic2" val secureTopic = "source-spec/secure-topic1" + val willTopic = "source-spec/will" val sourceSettings = connectionSettings.withClientId(clientId = "source-spec/source") val sinkSettings = connectionSettings.withClientId(clientId = "source-spec/sink") @@ -169,5 +170,42 @@ class MqttSourceSpec elem2.futureValue shouldBe MqttMessage(topic1, ByteString("ohi")) } } + + "support will message" in { + import system.dispatcher + + val (binding, connection) = Tcp().bind("localhost", 1337).toMat(Sink.head)(Keep.both).run() + + val ks = connection.map( + _.handleWith(Tcp().outgoingConnection("localhost", 1883).viaMat(KillSwitches.single)(Keep.right)) + ) + + whenReady(binding) { _ => + val settings = MqttSourceSettings( + sourceSettings + .withClientId("source-spec/testator") + .withBroker("tcp://localhost:1337") + .withWill(Will(MqttMessage(willTopic, ByteString("ohi")), MqttQoS.AtLeastOnce, retained = true)), + Map(willTopic -> MqttQoS.AtLeastOnce)) + val source = MqttSource(settings, 8) + + val sub = source.toMat(Sink.head)(Keep.left).run() + whenReady(sub) { _ => + whenReady(ks)(_.shutdown()) + } + } + + { + val settings = + MqttSourceSettings(sourceSettings.withClientId("source-spec/executor"), + Map(willTopic -> MqttQoS.AtLeastOnce)) + val source = MqttSource(settings, 8) + + val (sub, elem) = source.toMat(Sink.head)(Keep.both).run() + whenReady(sub) { _ => + elem.futureValue shouldBe MqttMessage(willTopic, ByteString("ohi")) + } + } + } } } diff --git a/mqtt/src/test/travis/acl b/mqtt/src/test/travis/acl index b305bd7998..ee1d4bb534 100644 --- a/mqtt/src/test/travis/acl +++ b/mqtt/src/test/travis/acl @@ -1,10 +1,11 @@ # anonymous user topic source-spec/topic1 topic source-spec/topic2 +topic source-spec/will topic sink-spec/topic1 topic source-test/topic1 topic source-test/topic2 user username1 topic source-spec/secure-topic1 -topic sink-spec/secure-topic1 \ No newline at end of file +topic sink-spec/secure-topic1