Skip to content

Commit

Permalink
Merge pull request #114 from prettynatty/mqtt-options
Browse files Browse the repository at this point in the history
expose cleanSession and will mqtt connection options
  • Loading branch information
patriknw authored Jan 13, 2017
2 parents 93453a7 + 7d793b8 commit a0dccea
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 4 deletions.
1 change: 0 additions & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,5 @@ docstrings = JavaDoc
indentOperator = spray
maxColumn = 120
rewrite.rules = [RedundantBraces, RedundantParens, SortImports]
spaces.inImportCurlyBraces = true
unindentTopLevelOperators = true
binPack.callSite = true
19 changes: 18 additions & 1 deletion mqtt/src/main/scala/akka/stream/alpakka/mqtt/Mqtt.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,22 @@ final case class MqttConnectionSettings(
broker: String,
clientId: String,
persistence: MqttClientPersistence,
auth: Option[(String, String)] = None
auth: Option[(String, String)] = None,
cleanSession: Boolean = true,
will: Option[Will] = None
) {
def withBroker(broker: String) =
copy(broker = broker)

def withAuth(username: String, password: String) =
copy(auth = Some((username, password)))

def withCleanSession(cleanSession: Boolean) =
copy(cleanSession = cleanSession)

def withWill(will: Will) =
copy(will = Some(will))

def withClientId(clientId: String) =
copy(clientId = clientId)
}
Expand All @@ -93,6 +104,8 @@ object MqttConnectionSettings {

final case class MqttMessage(topic: String, payload: ByteString)

final case class Will(message: MqttMessage, qos: MqttQoS, retained: Boolean)

object MqttMessage {

/**
Expand Down Expand Up @@ -146,6 +159,10 @@ private[mqtt] trait MqttConnectorLogic { this: GraphStageLogic =>
connectOptions.setUserName(user)
connectOptions.setPassword(password.toCharArray)
}
connectionSettings.will.foreach { will =>
connectOptions.setWill(will.message.topic, will.message.payload.toArray, will.qos.byteValue.toInt, will.retained)
}
connectOptions.setCleanSession(connectionSettings.cleanSession)
client.connect(connectOptions, (), connectHandler)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ package akka.stream.alpakka.mqtt.scaladsl

import akka.actor.ActorSystem
import akka.stream._
import akka.stream.alpakka.mqtt.{ MqttConnectionSettings, MqttMessage, MqttQoS, MqttSourceSettings }
import akka.stream.alpakka.mqtt._
import akka.stream.scaladsl._
import akka.stream.testkit.scaladsl.TestSink
import akka.testkit.TestKit
Expand Down Expand Up @@ -41,6 +41,7 @@ class MqttSourceSpec
val topic1 = "source-spec/topic1"
val topic2 = "source-spec/topic2"
val secureTopic = "source-spec/secure-topic1"
val willTopic = "source-spec/will"

val sourceSettings = connectionSettings.withClientId(clientId = "source-spec/source")
val sinkSettings = connectionSettings.withClientId(clientId = "source-spec/sink")
Expand Down Expand Up @@ -169,5 +170,42 @@ class MqttSourceSpec
elem2.futureValue shouldBe MqttMessage(topic1, ByteString("ohi"))
}
}

"support will message" in {
import system.dispatcher

val (binding, connection) = Tcp().bind("localhost", 1337).toMat(Sink.head)(Keep.both).run()

val ks = connection.map(
_.handleWith(Tcp().outgoingConnection("localhost", 1883).viaMat(KillSwitches.single)(Keep.right))
)

whenReady(binding) { _ =>
val settings = MqttSourceSettings(
sourceSettings
.withClientId("source-spec/testator")
.withBroker("tcp://localhost:1337")
.withWill(Will(MqttMessage(willTopic, ByteString("ohi")), MqttQoS.AtLeastOnce, retained = true)),
Map(willTopic -> MqttQoS.AtLeastOnce))
val source = MqttSource(settings, 8)

val sub = source.toMat(Sink.head)(Keep.left).run()
whenReady(sub) { _ =>
whenReady(ks)(_.shutdown())
}
}

{
val settings =
MqttSourceSettings(sourceSettings.withClientId("source-spec/executor"),
Map(willTopic -> MqttQoS.AtLeastOnce))
val source = MqttSource(settings, 8)

val (sub, elem) = source.toMat(Sink.head)(Keep.both).run()
whenReady(sub) { _ =>
elem.futureValue shouldBe MqttMessage(willTopic, ByteString("ohi"))
}
}
}
}
}
3 changes: 2 additions & 1 deletion mqtt/src/test/travis/acl
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
# anonymous user
topic source-spec/topic1
topic source-spec/topic2
topic source-spec/will
topic sink-spec/topic1
topic source-test/topic1
topic source-test/topic2

user username1
topic source-spec/secure-topic1
topic sink-spec/secure-topic1
topic sink-spec/secure-topic1

0 comments on commit a0dccea

Please sign in to comment.