Skip to content

Commit

Permalink
AMQP: Fix connection retrieval #3023 (#3025)
Browse files Browse the repository at this point in the history
* Fix for #3023
Offer new cached connection when previous one has been closed and released.
moved local variable into method (enable Scala 3 tail recursion).
updated validation in conn provider test.
  • Loading branch information
bturos authored Oct 20, 2023
1 parent 0f18616 commit a499d43
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -367,13 +367,14 @@ final class AmqpCachedConnectionProvider private (val provider: AmqpConnectionPr
def withAutomaticRelease(automaticRelease: Boolean): AmqpCachedConnectionProvider =
copy(automaticRelease = automaticRelease)

private lazy val connection = provider.get
private def getConnection = provider.get

@tailrec
override def get: Connection = state.get match {
case Empty =>
if (state.compareAndSet(Empty, Connecting)) {
try {
val connection = getConnection
if (!state.compareAndSet(Connecting, Connected(connection, 1)))
throw new ConcurrentModificationException(
"Unexpected concurrent modification while creating the connection."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,4 +196,23 @@ class AmqpConnectionProvidersSpec extends AmqpSpec {
catch { case e: Throwable => e shouldBe an[ConnectException] }
}
}

"The AMQP Reusable Connection Provider" should {
"open new connection when previous one is forced to close and released" in {
val connectionFactory = new ConnectionFactory()
val connectionProvider = AmqpConnectionFactoryConnectionProvider(connectionFactory)
.withHostAndPort("localhost", 5672)
val reusableConnectionProvider = AmqpCachedConnectionProvider(connectionProvider)
val originalConnection = reusableConnectionProvider.get

originalConnection.isOpen shouldBe true
originalConnection.abort(1, "Forced close")
reusableConnectionProvider.release(originalConnection)

val newConnection = reusableConnectionProvider.get
newConnection should not be (originalConnection)
newConnection.isOpen shouldBe true
originalConnection.isOpen should not be true
}
}
}

0 comments on commit a499d43

Please sign in to comment.