Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mqtt connector does not work with Akka Typed #1836

Closed
ytaras opened this issue Jul 23, 2019 · 8 comments · Fixed by #1848
Closed

Mqtt connector does not work with Akka Typed #1836

ytaras opened this issue Jul 23, 2019 · 8 comments · Fixed by #1848
Milestone

Comments

@ytaras
Copy link

ytaras commented Jul 23, 2019

Versions used

Akka version: 2.5.23
Alpakka MQTT Streaming: 1.1.0

Expected Behavior

I am trying to use MQTT Streaming in a project which is built on top of Akka Typed API.

Actual Behavior

If I try to pass ActorSystem to ActorMqttSession, it fails with exception:

Exception in thread "main" java.lang.UnsupportedOperationException: cannot create top-level actor [client-consumer-packet-id-allocator-0] from the outside on ActorSystem with custom user guardian
at akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:780)
at akka.actor.typed.internal.adapter.ActorRefFactoryAdapter$.spawn(ActorRefFactoryAdapter.scala:40)
at akka.actor.typed.scaladsl.adapter.package$UntypedActorSystemOps$.spawn$extension(package.scala:67)
at akka.stream.alpakka.mqtt.streaming.scaladsl.ActorMqttClientSession.<init>(MqttSession.scala:117)
at akka.stream.alpakka.mqtt.streaming.scaladsl.ActorMqttClientSession$.apply(MqttSession.scala:82)
at Bug$.delayedEndpoint$Bug$1(Bug.scala:12)
at Bug$delayedInit$body.apply(Bug.scala:7)
at scala.Function0.apply$mcV$sp(Function0.scala:39)
at scala.Function0.apply$mcV$sp$(Function0.scala:39)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17)
at scala.App.$anonfun$main$1(App.scala:75)
at scala.App.$anonfun$main$1$adapted(App.scala:75)
at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:576)
at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:574)
at scala.collection.AbstractIterable.foreach(Iterable.scala:904)
at scala.App.main(App.scala:75)
at scala.App.main$(App.scala:73)
at Bug$.main(Bug.scala:7)
at Bug.main(Bug.scala)

Reproducible Test Case

It boils down to following code:

import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.stream.alpakka.mqtt.streaming.MqttSessionSettings
import akka.stream.alpakka.mqtt.streaming.scaladsl.ActorMqttClientSession
import akka.stream.typed.scaladsl.ActorMaterializer

object Bug extends App{
  implicit val system = ActorSystem(Behaviors.ignore, "TopLevel")
  implicit val am = ActorMaterializer()

  val settings = MqttSessionSettings()
  val session = ActorMqttClientSession(settings)(am, am.system)

}
@huntc
Copy link
Contributor

huntc commented Jul 24, 2019

Sounds more like a bug report for the Akka team. akka-mqtt-streaming uses Akka Typed under the hood, so may be there's some incompatibility when going from typed to classic to typed...

@ytaras
Copy link
Author

ytaras commented Jul 24, 2019

Sounds more like a bug report for the Akka team. akka-mqtt-streaming uses Akka Typed under the hood, so may be there's some incompatibility when going from typed to classic to typed...

Thanks for pointing that out. Yes, I can reproduce it by using this sample code:

import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._

object Bug2 extends App {
  val system = ActorSystem(Behaviors.ignore, "TopLevel")
  system.toUntyped.spawn(Behaviors.ignore, "Another")
}

However I am hesitant about opening an issue to Akka team yet. I am learning Akka yet so I am not sure if this pattern is considered a good practice. We are trying to create new actor on a typed actor system bypassing guardian actor and typed API is designed to make it harder on purpose.

I think intended way of implementing this is creating system actors:

import akka.actor.typed.{ActorRef, ActorSystem}
import akka.actor.typed.scaladsl.Behaviors

import scala.util.Try

object Alternative extends App {
  import scala.concurrent.duration._
  val system = ActorSystem(Behaviors.ignore, "TopLevel")
  import system.executionContext
  system.systemActorOf(Behaviors.ignore, "Another")(10.seconds)
    .onComplete { x: Try[ActorRef[Nothing]] => println(s"Actor created - $x") }
}

So probably it's better to create alternative constructor for ActorMqtttClientSession, which accepts typed ActorMaterializer or Materializer + typed.ActorSystem pair and uses system actors API in that case. WDYT?

@huntc
Copy link
Contributor

huntc commented Jul 24, 2019

I don't really know.

Maybe it'd be useful to discuss this particular issue at https://discuss.lightbend.com/c/akka first.

@2m
Copy link
Member

2m commented Jul 26, 2019

Thanks for starting the discussion in the forums. Now it is clear that system.spawn needs to be replaced with system.systemActorOf so the connector works when typed actor system is provided.

@ytaras would you be willing to create a PR?

@huntc
Copy link
Contributor

huntc commented Jul 28, 2019

I had a stab at this today. It is tricky as the typed system returns a future of actor ref. I’m not sure how you can convert that to just an actor ref as per untyped. Looking for inspiration.

@ytaras
Copy link
Author

ytaras commented Jul 29, 2019

I'm sorry for late reply.
I think I can help with it, but it would take same time for me to get familiar with existing code.
Regarding the future part - I see 2 ways:

  1. Use Await to get result from future. I think we all agree it's not that good from design point of view, but much simpler
  2. Use futures all the way down in underlying API, so we could create actors using Future.succesfull(actorRef) in untyped scenario and system.systemActorOf in typed scenario.

Please let me know preferred way of implementation and if you want me to work on a PR for this.

@2m
Copy link
Member

2m commented Jul 30, 2019

I have attempted to fix this in #1848 which copies the adapter code from akka but does not wrap the resulting ActorRef with a Future. Ideally this should be fixed in akka/akka as I currently do not see a reason why wrapping in Future is needed.

@2m
Copy link
Member

2m commented Jul 30, 2019

Created a ticket to fix this in akka/akka: akka/akka#27437

@2m 2m closed this as completed in #1848 Jul 30, 2019
@ennru ennru added this to the 1.1.1 milestone Aug 5, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants