diff --git a/mqtt-streaming/src/main/scala/akka/stream/alpakka/mqtt/streaming/scaladsl/MqttSession.scala b/mqtt-streaming/src/main/scala/akka/stream/alpakka/mqtt/streaming/scaladsl/MqttSession.scala index 0f20cfa49e..29413b942f 100644 --- a/mqtt-streaming/src/main/scala/akka/stream/alpakka/mqtt/streaming/scaladsl/MqttSession.scala +++ b/mqtt-streaming/src/main/scala/akka/stream/alpakka/mqtt/streaming/scaladsl/MqttSession.scala @@ -7,6 +7,8 @@ package scaladsl import java.util.concurrent.atomic.AtomicLong +import akka.actor.ExtendedActorSystem +import akka.actor.typed.internal.adapter.{ActorRefAdapter, PropsAdapter} import akka.{Done, NotUsed, actor => untyped} import akka.actor.typed.scaladsl.adapter._ import akka.event.Logging @@ -114,21 +116,48 @@ final class ActorMqttClientSession(settings: MqttSessionSettings)(implicit mat: private val clientSessionId = clientSessionCounter.getAndIncrement() private val consumerPacketRouter = - system.spawn(RemotePacketRouter[Consumer.Event], "client-consumer-packet-id-allocator-" + clientSessionId) + ActorRefAdapter( + system + .asInstanceOf[ExtendedActorSystem] + .systemActorOf(PropsAdapter(() ⇒ RemotePacketRouter[Consumer.Event]), + "client-consumer-packet-id-allocator-" + clientSessionId) + ) private val producerPacketRouter = - system.spawn(LocalPacketRouter[Producer.Event], "client-producer-packet-id-allocator-" + clientSessionId) + ActorRefAdapter( + system + .asInstanceOf[ExtendedActorSystem] + .systemActorOf(PropsAdapter(() ⇒ LocalPacketRouter[Producer.Event]), + "client-producer-packet-id-allocator-" + clientSessionId) + ) private val subscriberPacketRouter = - system.spawn(LocalPacketRouter[Subscriber.Event], "client-subscriber-packet-id-allocator-" + clientSessionId) + ActorRefAdapter( + system + .asInstanceOf[ExtendedActorSystem] + .systemActorOf(PropsAdapter(() ⇒ LocalPacketRouter[Subscriber.Event]), + "client-subscriber-packet-id-allocator-" + clientSessionId) + ) private val unsubscriberPacketRouter = - system.spawn(LocalPacketRouter[Unsubscriber.Event], "client-unsubscriber-packet-id-allocator-" + clientSessionId) + ActorRefAdapter( + system + .asInstanceOf[ExtendedActorSystem] + .systemActorOf(PropsAdapter(() ⇒ LocalPacketRouter[Unsubscriber.Event]), + "client-unsubscriber-packet-id-allocator-" + clientSessionId) + ) private val clientConnector = - system.spawn( - ClientConnector(consumerPacketRouter, - producerPacketRouter, - subscriberPacketRouter, - unsubscriberPacketRouter, - settings), - "client-connector-" + clientSessionId + ActorRefAdapter( + system + .asInstanceOf[ExtendedActorSystem] + .systemActorOf( + PropsAdapter( + () ⇒ + ClientConnector(consumerPacketRouter, + producerPacketRouter, + subscriberPacketRouter, + unsubscriberPacketRouter, + settings) + ), + "client-connector-" + clientSessionId + ) ) import MqttCodec._ @@ -428,22 +457,49 @@ final class ActorMqttServerSession(settings: MqttSessionSettings)(implicit mat: } private val consumerPacketRouter = - system.spawn(RemotePacketRouter[Consumer.Event], "server-consumer-packet-id-allocator-" + serverSessionId) + ActorRefAdapter( + system + .asInstanceOf[ExtendedActorSystem] + .systemActorOf(PropsAdapter(() ⇒ RemotePacketRouter[Consumer.Event]), + "server-consumer-packet-id-allocator-" + serverSessionId) + ) private val producerPacketRouter = - system.spawn(LocalPacketRouter[Producer.Event], "server-producer-packet-id-allocator-" + serverSessionId) + ActorRefAdapter( + system + .asInstanceOf[ExtendedActorSystem] + .systemActorOf(PropsAdapter(() ⇒ LocalPacketRouter[Producer.Event]), + "server-producer-packet-id-allocator-" + serverSessionId) + ) private val publisherPacketRouter = - system.spawn(RemotePacketRouter[Publisher.Event], "server-publisher-packet-id-allocator-" + serverSessionId) + ActorRefAdapter( + system + .asInstanceOf[ExtendedActorSystem] + .systemActorOf(PropsAdapter(() ⇒ RemotePacketRouter[Publisher.Event]), + "server-publisher-packet-id-allocator-" + serverSessionId) + ) private val unpublisherPacketRouter = - system.spawn(RemotePacketRouter[Unpublisher.Event], "server-unpublisher-packet-id-allocator-" + serverSessionId) + ActorRefAdapter( + system + .asInstanceOf[ExtendedActorSystem] + .systemActorOf(PropsAdapter(() ⇒ RemotePacketRouter[Unpublisher.Event]), + "server-unpublisher-packet-id-allocator-" + serverSessionId) + ) private val serverConnector = - system.spawn( - ServerConnector(terminations, - consumerPacketRouter, - producerPacketRouter, - publisherPacketRouter, - unpublisherPacketRouter, - settings), - "server-connector-" + serverSessionId + ActorRefAdapter( + system + .asInstanceOf[ExtendedActorSystem] + .systemActorOf( + PropsAdapter( + () ⇒ + ServerConnector(terminations, + consumerPacketRouter, + producerPacketRouter, + publisherPacketRouter, + unpublisherPacketRouter, + settings) + ), + "server-connector-" + serverSessionId + ) ) import MqttCodec._ diff --git a/mqtt-streaming/src/test/scala/docs/scaladsl/MqttFlowSpec.scala b/mqtt-streaming/src/test/scala/docs/scaladsl/MqttFlowSpec.scala index ad5d1c44e0..b4dba888d7 100644 --- a/mqtt-streaming/src/test/scala/docs/scaladsl/MqttFlowSpec.scala +++ b/mqtt-streaming/src/test/scala/docs/scaladsl/MqttFlowSpec.scala @@ -6,6 +6,8 @@ package docs.scaladsl import akka.{Done, NotUsed} import akka.actor.ActorSystem +import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.scaladsl.adapter._ import akka.stream.alpakka.mqtt.streaming._ import akka.stream.alpakka.mqtt.streaming.scaladsl.{ActorMqttClientSession, ActorMqttServerSession, Mqtt} import akka.stream.scaladsl.{BroadcastHub, Flow, Keep, Sink, Source, SourceQueueWithComplete, Tcp} @@ -19,12 +21,12 @@ import org.scalatest.concurrent.ScalaFutures import scala.concurrent.{ExecutionContext, Future, Promise} import scala.concurrent.duration._ -class MqttFlowSpec - extends TestKit(ActorSystem("MqttFlowSpec")) - with WordSpecLike - with Matchers - with BeforeAndAfterAll - with ScalaFutures { +class UntypedMqttFlowSpec extends TestKit(ActorSystem("UntypedMqttFlowSpec")) with MqttFlowSpec +class TypedMqttFlowSpec + extends TestKit(akka.actor.typed.ActorSystem(Behaviors.ignore, "TypedMqttFlowSpec").toUntyped) + with MqttFlowSpec + +trait MqttFlowSpec extends WordSpecLike with Matchers with BeforeAndAfterAll with ScalaFutures { self: TestKit => private implicit val defaultPatience: PatienceConfig = PatienceConfig(timeout = 5.seconds, interval = 100.millis)