Skip to content

Commit

Permalink
Spawn MQTT connector actors as system actors
Browse files Browse the repository at this point in the history
  • Loading branch information
2m committed Jul 30, 2019
1 parent c27e2c6 commit fb5b428
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package scaladsl

import java.util.concurrent.atomic.AtomicLong

import akka.actor.ExtendedActorSystem
import akka.actor.typed.internal.adapter.{ActorRefAdapter, PropsAdapter}
import akka.{Done, NotUsed, actor => untyped}
import akka.actor.typed.scaladsl.adapter._
import akka.event.Logging
Expand Down Expand Up @@ -114,21 +116,48 @@ final class ActorMqttClientSession(settings: MqttSessionSettings)(implicit mat:

private val clientSessionId = clientSessionCounter.getAndIncrement()
private val consumerPacketRouter =
system.spawn(RemotePacketRouter[Consumer.Event], "client-consumer-packet-id-allocator-" + clientSessionId)
ActorRefAdapter(
system
.asInstanceOf[ExtendedActorSystem]
.systemActorOf(PropsAdapter(() RemotePacketRouter[Consumer.Event]),
"client-consumer-packet-id-allocator-" + clientSessionId)
)
private val producerPacketRouter =
system.spawn(LocalPacketRouter[Producer.Event], "client-producer-packet-id-allocator-" + clientSessionId)
ActorRefAdapter(
system
.asInstanceOf[ExtendedActorSystem]
.systemActorOf(PropsAdapter(() LocalPacketRouter[Producer.Event]),
"client-producer-packet-id-allocator-" + clientSessionId)
)
private val subscriberPacketRouter =
system.spawn(LocalPacketRouter[Subscriber.Event], "client-subscriber-packet-id-allocator-" + clientSessionId)
ActorRefAdapter(
system
.asInstanceOf[ExtendedActorSystem]
.systemActorOf(PropsAdapter(() LocalPacketRouter[Subscriber.Event]),
"client-subscriber-packet-id-allocator-" + clientSessionId)
)
private val unsubscriberPacketRouter =
system.spawn(LocalPacketRouter[Unsubscriber.Event], "client-unsubscriber-packet-id-allocator-" + clientSessionId)
ActorRefAdapter(
system
.asInstanceOf[ExtendedActorSystem]
.systemActorOf(PropsAdapter(() LocalPacketRouter[Unsubscriber.Event]),
"client-unsubscriber-packet-id-allocator-" + clientSessionId)
)
private val clientConnector =
system.spawn(
ClientConnector(consumerPacketRouter,
producerPacketRouter,
subscriberPacketRouter,
unsubscriberPacketRouter,
settings),
"client-connector-" + clientSessionId
ActorRefAdapter(
system
.asInstanceOf[ExtendedActorSystem]
.systemActorOf(
PropsAdapter(
()
ClientConnector(consumerPacketRouter,
producerPacketRouter,
subscriberPacketRouter,
unsubscriberPacketRouter,
settings)
),
"client-connector-" + clientSessionId
)
)

import MqttCodec._
Expand Down Expand Up @@ -428,22 +457,49 @@ final class ActorMqttServerSession(settings: MqttSessionSettings)(implicit mat:
}

private val consumerPacketRouter =
system.spawn(RemotePacketRouter[Consumer.Event], "server-consumer-packet-id-allocator-" + serverSessionId)
ActorRefAdapter(
system
.asInstanceOf[ExtendedActorSystem]
.systemActorOf(PropsAdapter(() RemotePacketRouter[Consumer.Event]),
"server-consumer-packet-id-allocator-" + serverSessionId)
)
private val producerPacketRouter =
system.spawn(LocalPacketRouter[Producer.Event], "server-producer-packet-id-allocator-" + serverSessionId)
ActorRefAdapter(
system
.asInstanceOf[ExtendedActorSystem]
.systemActorOf(PropsAdapter(() LocalPacketRouter[Producer.Event]),
"server-producer-packet-id-allocator-" + serverSessionId)
)
private val publisherPacketRouter =
system.spawn(RemotePacketRouter[Publisher.Event], "server-publisher-packet-id-allocator-" + serverSessionId)
ActorRefAdapter(
system
.asInstanceOf[ExtendedActorSystem]
.systemActorOf(PropsAdapter(() RemotePacketRouter[Publisher.Event]),
"server-publisher-packet-id-allocator-" + serverSessionId)
)
private val unpublisherPacketRouter =
system.spawn(RemotePacketRouter[Unpublisher.Event], "server-unpublisher-packet-id-allocator-" + serverSessionId)
ActorRefAdapter(
system
.asInstanceOf[ExtendedActorSystem]
.systemActorOf(PropsAdapter(() RemotePacketRouter[Unpublisher.Event]),
"server-unpublisher-packet-id-allocator-" + serverSessionId)
)
private val serverConnector =
system.spawn(
ServerConnector(terminations,
consumerPacketRouter,
producerPacketRouter,
publisherPacketRouter,
unpublisherPacketRouter,
settings),
"server-connector-" + serverSessionId
ActorRefAdapter(
system
.asInstanceOf[ExtendedActorSystem]
.systemActorOf(
PropsAdapter(
()
ServerConnector(terminations,
consumerPacketRouter,
producerPacketRouter,
publisherPacketRouter,
unpublisherPacketRouter,
settings)
),
"server-connector-" + serverSessionId
)
)

import MqttCodec._
Expand Down
14 changes: 8 additions & 6 deletions mqtt-streaming/src/test/scala/docs/scaladsl/MqttFlowSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package docs.scaladsl

import akka.{Done, NotUsed}
import akka.actor.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._
import akka.stream.alpakka.mqtt.streaming._
import akka.stream.alpakka.mqtt.streaming.scaladsl.{ActorMqttClientSession, ActorMqttServerSession, Mqtt}
import akka.stream.scaladsl.{BroadcastHub, Flow, Keep, Sink, Source, SourceQueueWithComplete, Tcp}
Expand All @@ -19,12 +21,12 @@ import org.scalatest.concurrent.ScalaFutures
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.concurrent.duration._

class MqttFlowSpec
extends TestKit(ActorSystem("MqttFlowSpec"))
with WordSpecLike
with Matchers
with BeforeAndAfterAll
with ScalaFutures {
class UntypedMqttFlowSpec extends TestKit(ActorSystem("UntypedMqttFlowSpec")) with MqttFlowSpec
class TypedMqttFlowSpec
extends TestKit(akka.actor.typed.ActorSystem(Behaviors.ignore, "TypedMqttFlowSpec").toUntyped)
with MqttFlowSpec

trait MqttFlowSpec extends WordSpecLike with Matchers with BeforeAndAfterAll with ScalaFutures { self: TestKit =>

private implicit val defaultPatience: PatienceConfig = PatienceConfig(timeout = 5.seconds, interval = 100.millis)

Expand Down

0 comments on commit fb5b428

Please sign in to comment.