Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AMQP cached connection provider does not reconnect #3023

Closed
bturos opened this issue Oct 13, 2023 · 2 comments
Closed

AMQP cached connection provider does not reconnect #3023

bturos opened this issue Oct 13, 2023 · 2 comments
Labels

Comments

@bturos
Copy link
Contributor

bturos commented Oct 13, 2023

Hi! First of all, thanks for all the hard work on developing the Alpakka. We're using it on daily basis in our project and it's very useful 🙂
Also, this is my first issue submission, if there's anything wrong or missing, please let me know

Versions used

Akka version: 2.8.4
Alpakka AMQP: 6.0.2

Expected Behavior

With restartable AMQP source, when server side closes the connection, client should attempt to reconnect

Actual Behavior

When using AMQPCachedConnectionProvider, once server closes the connection, client is unable to reconnect properly

Relevant logs

09:58:01.382 [test-akka.actor.default-dispatcher-5] WARN  a.s.s.RestartWithBackoffSource - Restarting stream due to failure [5]: com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - Closed via management plugin, class-id=0, method-id=0)
com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - Closed via management plugin, class-id=0, method-id=0)
	at com.rabbitmq.client.impl.AMQConnection.ensureIsOpen(AMQConnection.java:175)
	at com.rabbitmq.client.impl.AMQConnection.createChannel(AMQConnection.java:615)
	at akka.stream.alpakka.amqp.impl.AmqpConnectorLogic.preStart(AmqpConnectorLogic.scala:30)
	at akka.stream.alpakka.amqp.impl.AmqpConnectorLogic.preStart$(AmqpConnectorLogic.scala:27)
	at akka.stream.alpakka.amqp.impl.AmqpSourceStage$$anon$1.preStart(AmqpSourceStage.scala:44)
	at akka.stream.impl.fusing.GraphInterpreter.init(GraphInterpreter.scala:309)
	at akka.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:619)
	at akka.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:727)
	at akka.stream.impl.fusing.ActorGraphInterpreter.finishShellRegistration(ActorGraphInterpreter.scala:770)
	at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$shortCircuitBatch(ActorGraphInterpreter.scala:788)
	at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:819)
	at akka.actor.Actor.aroundReceive(Actor.scala:537)
	at akka.actor.Actor.aroundReceive$(Actor.scala:535)
	at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:716)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)
	at akka.actor.ActorCell.invoke(ActorCell.scala:547)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
	at akka.dispatch.Mailbox.run(Mailbox.scala:231)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)

Reproducible Test Case

The simplest I could come up with is the following (note: I tested with local RabbitMQ instance):

val amqpConnectionProvider: AmqpCachedConnectionProvider =
    AmqpCachedConnectionProvider(
      AmqpDetailsConnectionProvider("localhost", 5672)
        .withConnectionName("myConnection")
        .withAutomaticRecoveryEnabled(false)
        .withTopologyRecoveryEnabled(false)
        .withCredentials(AmqpCredentials("guest", "guest"))
    )

  val source = AmqpSource.committableSource(
    NamedQueueSourceSettings(
      amqpConnectionProvider,
      "myQueue"
    ),
    bufferSize = 10
  )

  val backoffSettings: RestartSettings = RestartSettings(1.second, 5.seconds, 0.2)

  implicit val actorSystem: ActorSystem = ActorSystem("test")

  RestartSource
    .withBackoff(backoffSettings) { () =>
      println("Restarting AMQP source")
      source.map(_ => ())
    }
    .viaMat(KillSwitches.single)(Keep.right)
    .toMat(Sink.ignore)(Keep.left)
    .run()

I think this is a regression introduced here: https://github.com/akka/alpakka/pull/2959/files
Looks like after this update, internal provider is never asked to open a new connection again. I might be misunderstanding the idea though, feel free to correct me.

ℹ️ The same logic is working with Alpakka 5.0

I'll be happy to open a PR for this, if you think this is an actual issue (as a novice here I might need some guidance though 🙂 )

Cheers!

@ennru
Copy link
Member

ennru commented Oct 17, 2023

Thank you for pointing this misbehaviour out.
Moving the connection field into a local value seems to be the correct thing to do.

@ennru ennru added the p:amqp label Oct 17, 2023
bturos added a commit to bturos/alpakka that referenced this issue Oct 19, 2023
Offer new cached connection when previous one has been closed and released
@bturos bturos mentioned this issue Oct 19, 2023
bturos added a commit to bturos/alpakka that referenced this issue Oct 19, 2023
bturos added a commit to bturos/alpakka that referenced this issue Oct 19, 2023
ennru pushed a commit that referenced this issue Oct 20, 2023
* Fix for #3023
Offer new cached connection when previous one has been closed and released.
moved local variable into method (enable Scala 3 tail recursion).
updated validation in conn provider test.
@ennru
Copy link
Member

ennru commented Oct 20, 2023

Fixed with #3025

@ennru ennru closed this as completed Oct 20, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants