diff --git a/mqtt-streaming/src/test/scala/docs/scaladsl/MqttFlowSpec.scala b/mqtt-streaming/src/test/scala/docs/scaladsl/MqttFlowSpec.scala index b4dba888d7..658567c3ce 100644 --- a/mqtt-streaming/src/test/scala/docs/scaladsl/MqttFlowSpec.scala +++ b/mqtt-streaming/src/test/scala/docs/scaladsl/MqttFlowSpec.scala @@ -21,12 +21,19 @@ import org.scalatest.concurrent.ScalaFutures import scala.concurrent.{ExecutionContext, Future, Promise} import scala.concurrent.duration._ -class UntypedMqttFlowSpec extends TestKit(ActorSystem("UntypedMqttFlowSpec")) with MqttFlowSpec +class UntypedMqttFlowSpec + extends ParametrizedTestKit("untyped-flow-spec/flow", "typed-flow-spec/topic1", ActorSystem("UntypedMqttFlowSpec")) + with MqttFlowSpec class TypedMqttFlowSpec - extends TestKit(akka.actor.typed.ActorSystem(Behaviors.ignore, "TypedMqttFlowSpec").toUntyped) + extends ParametrizedTestKit("typed-flow-spec/flow", + "typed-flow-spec/topic1", + akka.actor.typed.ActorSystem(Behaviors.ignore, "TypedMqttFlowSpec").toUntyped) with MqttFlowSpec -trait MqttFlowSpec extends WordSpecLike with Matchers with BeforeAndAfterAll with ScalaFutures { self: TestKit => +class ParametrizedTestKit(val clientId: String, val topic: String, system: ActorSystem) extends TestKit(system) + +trait MqttFlowSpec extends WordSpecLike with Matchers with BeforeAndAfterAll with ScalaFutures { + self: ParametrizedTestKit => private implicit val defaultPatience: PatienceConfig = PatienceConfig(timeout = 5.seconds, interval = 100.millis) @@ -38,9 +45,6 @@ trait MqttFlowSpec extends WordSpecLike with Matchers with BeforeAndAfterAll wit "mqtt client flow" should { "establish a bidirectional connection and subscribe to a topic" in assertAllStagesStopped { - val clientId = "source-spec/flow" - val topic = "source-spec/topic1" - //#create-streaming-flow val settings = MqttSessionSettings() val session = ActorMqttClientSession(settings) @@ -87,10 +91,7 @@ trait MqttFlowSpec extends WordSpecLike with Matchers with BeforeAndAfterAll wit "mqtt server flow" should { "receive a bidirectional connection and a subscription to a topic" in assertAllStagesStopped { - val clientId = "flow-spec/flow" - val topic = "source-spec/topic1" val host = "localhost" - val port = 9883 //#create-streaming-bind-flow val settings = MqttSessionSettings() @@ -100,7 +101,7 @@ trait MqttFlowSpec extends WordSpecLike with Matchers with BeforeAndAfterAll wit val bindSource: Source[Either[MqttCodec.DecodeError, Event[Nothing]], Future[Tcp.ServerBinding]] = Tcp() - .bind(host, port) + .bind(host, 0) .flatMapMerge( maxConnections, { connection => val mqttFlow: Flow[Command[Nothing], Either[MqttCodec.DecodeError, Event[Nothing]], NotUsed] = @@ -141,10 +142,11 @@ trait MqttFlowSpec extends WordSpecLike with Matchers with BeforeAndAfterAll wit .run() //#run-streaming-bind-flow - bound.futureValue.localAddress.getPort shouldBe port + val binding = bound.futureValue + binding.localAddress.getPort should not be 0 val clientSession = ActorMqttClientSession(settings) - val connection = Tcp().outgoingConnection(host, port) + val connection = Tcp().outgoingConnection(host, binding.localAddress.getPort) val mqttFlow = Mqtt.clientSessionFlow(clientSession, ByteString("1")).join(connection) val (commands, events) = Source diff --git a/mqtt-streaming/src/test/scala/docs/scaladsl/MqttSessionSpec.scala b/mqtt-streaming/src/test/scala/docs/scaladsl/MqttSessionSpec.scala index cc3beae6a4..8b8816d8a3 100644 --- a/mqtt-streaming/src/test/scala/docs/scaladsl/MqttSessionSpec.scala +++ b/mqtt-streaming/src/test/scala/docs/scaladsl/MqttSessionSpec.scala @@ -513,6 +513,9 @@ class MqttSessionSpec } "receive a QoS 1 publication from a subscribed topic and ack it and then ack it again - the stream should ignore" in assertAllStagesStopped { + // longer patience needed since Akka 2.6 + implicit val patienceConfig = PatienceConfig(scaled(1.second), scaled(50.millis)) + val session = ActorMqttClientSession(settings) val server = TestProbe() diff --git a/mqtt/src/test/travis/acl b/mqtt/src/test/travis/acl index c9a23420ac..340bab3d31 100644 --- a/mqtt/src/test/travis/acl +++ b/mqtt/src/test/travis/acl @@ -14,6 +14,8 @@ topic source-test/manualacks topic source-test/pendingacks topic flow-spec/topic-ack topic flow-test/topic-ack +topic typed-flow-spec/topic1 +topic untyped-flow-spec/topic1 user username1 topic source-spec/secure-topic1