Skip to content

Commit

Permalink
Merge pull request #1849 from 2m/wip-mqtt-separate-flow-spec-2m
Browse files Browse the repository at this point in the history
Make typed/untyped MqttFlowSpec independent
  • Loading branch information
2m authored Jul 31, 2019
2 parents 31866fd + 9ae6cd8 commit 569f2aa
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 12 deletions.
26 changes: 14 additions & 12 deletions mqtt-streaming/src/test/scala/docs/scaladsl/MqttFlowSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,19 @@ import org.scalatest.concurrent.ScalaFutures
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.concurrent.duration._

class UntypedMqttFlowSpec extends TestKit(ActorSystem("UntypedMqttFlowSpec")) with MqttFlowSpec
class UntypedMqttFlowSpec
extends ParametrizedTestKit("untyped-flow-spec/flow", "typed-flow-spec/topic1", ActorSystem("UntypedMqttFlowSpec"))
with MqttFlowSpec
class TypedMqttFlowSpec
extends TestKit(akka.actor.typed.ActorSystem(Behaviors.ignore, "TypedMqttFlowSpec").toUntyped)
extends ParametrizedTestKit("typed-flow-spec/flow",
"typed-flow-spec/topic1",
akka.actor.typed.ActorSystem(Behaviors.ignore, "TypedMqttFlowSpec").toUntyped)
with MqttFlowSpec

trait MqttFlowSpec extends WordSpecLike with Matchers with BeforeAndAfterAll with ScalaFutures { self: TestKit =>
class ParametrizedTestKit(val clientId: String, val topic: String, system: ActorSystem) extends TestKit(system)

trait MqttFlowSpec extends WordSpecLike with Matchers with BeforeAndAfterAll with ScalaFutures {
self: ParametrizedTestKit =>

private implicit val defaultPatience: PatienceConfig = PatienceConfig(timeout = 5.seconds, interval = 100.millis)

Expand All @@ -38,9 +45,6 @@ trait MqttFlowSpec extends WordSpecLike with Matchers with BeforeAndAfterAll wit

"mqtt client flow" should {
"establish a bidirectional connection and subscribe to a topic" in assertAllStagesStopped {
val clientId = "source-spec/flow"
val topic = "source-spec/topic1"

//#create-streaming-flow
val settings = MqttSessionSettings()
val session = ActorMqttClientSession(settings)
Expand Down Expand Up @@ -87,10 +91,7 @@ trait MqttFlowSpec extends WordSpecLike with Matchers with BeforeAndAfterAll wit

"mqtt server flow" should {
"receive a bidirectional connection and a subscription to a topic" in assertAllStagesStopped {
val clientId = "flow-spec/flow"
val topic = "source-spec/topic1"
val host = "localhost"
val port = 9883

//#create-streaming-bind-flow
val settings = MqttSessionSettings()
Expand All @@ -100,7 +101,7 @@ trait MqttFlowSpec extends WordSpecLike with Matchers with BeforeAndAfterAll wit

val bindSource: Source[Either[MqttCodec.DecodeError, Event[Nothing]], Future[Tcp.ServerBinding]] =
Tcp()
.bind(host, port)
.bind(host, 0)
.flatMapMerge(
maxConnections, { connection =>
val mqttFlow: Flow[Command[Nothing], Either[MqttCodec.DecodeError, Event[Nothing]], NotUsed] =
Expand Down Expand Up @@ -141,10 +142,11 @@ trait MqttFlowSpec extends WordSpecLike with Matchers with BeforeAndAfterAll wit
.run()
//#run-streaming-bind-flow

bound.futureValue.localAddress.getPort shouldBe port
val binding = bound.futureValue
binding.localAddress.getPort should not be 0

val clientSession = ActorMqttClientSession(settings)
val connection = Tcp().outgoingConnection(host, port)
val connection = Tcp().outgoingConnection(host, binding.localAddress.getPort)
val mqttFlow = Mqtt.clientSessionFlow(clientSession, ByteString("1")).join(connection)
val (commands, events) =
Source
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,9 @@ class MqttSessionSpec
}

"receive a QoS 1 publication from a subscribed topic and ack it and then ack it again - the stream should ignore" in assertAllStagesStopped {
// longer patience needed since Akka 2.6
implicit val patienceConfig = PatienceConfig(scaled(1.second), scaled(50.millis))

val session = ActorMqttClientSession(settings)

val server = TestProbe()
Expand Down
2 changes: 2 additions & 0 deletions mqtt/src/test/travis/acl
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ topic source-test/manualacks
topic source-test/pendingacks
topic flow-spec/topic-ack
topic flow-test/topic-ack
topic typed-flow-spec/topic1
topic untyped-flow-spec/topic1

user username1
topic source-spec/secure-topic1
Expand Down

0 comments on commit 569f2aa

Please sign in to comment.