Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

jms: close sessions on connection loss #3041

Merged
merged 3 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ private[jms] trait JmsConnector[S <: JmsSession] extends TimerGraphStageLogic wi
}

private def handleRetriableException(ex: Throwable): Unit = {
jmsSessions = Seq.empty
closeSessions()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the bug fix


connectionState match {
case JmsConnectorInitializing(_, attempt, backoffMaxed, _) =>
maybeReconnect(ex, attempt, backoffMaxed)
Expand Down Expand Up @@ -270,16 +271,17 @@ private[jms] trait JmsConnector[S <: JmsSession] extends TimerGraphStageLogic wi
eventualConnection.map(closeConnection).map(_ => Done)

protected def closeSessions(): Unit = {
jmsSessions.foreach(s => closeSession(s))
jmsSessions.foreach(closeSession)
jmsSessions = Seq.empty
}

protected def closeSessionsAsync(): Future[Unit] = {
val closing = Future
.sequence {
jmsSessions.map(s => Future(closeSession(s)))
jmsSessions.map(s => Future { closeSession(s) })
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a strong preference for curly braces around Future.apply calls to make them visually stand out.

}
.map(_ => ())
.flatMap(_ => Future.unit)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this was written before Future.unit existed, but since we're optimizing memory...


jmsSessions = Seq.empty
closing
}
leviramsey marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,48 +42,39 @@ private[jms] final class JmsTxSourceStage(settings: JmsConsumerSettings, destina

protected def pushMessage(msg: TxEnvelope): Unit = push(out, msg)

override protected def onSessionOpened(jmsSession: JmsConsumerSession): Unit =
jmsSession match {
case session: JmsSession =>
session
.createConsumer(settings.selector)
.map { consumer =>
consumer.setMessageListener(new jms.MessageListener {

def onMessage(message: jms.Message): Unit =
try {
val envelope = TxEnvelope(message, session)
handleMessage.invoke(envelope)
try {
// JMS spec defines that commit/rollback must be done on the same thread.
// While some JMS implementations work without this constraint, IBM MQ is
// very strict about the spec and throws exceptions when called from a different thread.
val action = Await.result(envelope.commitFuture, settings.ackTimeout)
action()
} catch {
case _: TimeoutException =>
val exception = new JmsTxAckTimeout(settings.ackTimeout)
session.session.rollback()
if (settings.failStreamOnAckTimeout) {
handleError.invoke(exception)
} else {
log.warning(exception.getMessage)
}
override protected def onSessionOpened(consumerSession: JmsConsumerSession): Unit =
consumerSession
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clearer code through trusting the Scala compiler...

.createConsumer(settings.selector)
.map { consumer =>
consumer.setMessageListener(new jms.MessageListener {
def onMessage(message: jms.Message): Unit = {
try {
val envelope = TxEnvelope(message, consumerSession)
handleMessage.invoke(envelope)
try {
// JMS spec defines that commit/rollback must be done on the same thread.
// While some JMS implementations work without this constraint, IBM MQ is
// very strict about the spec and throws exceptions when called from a different thread.
val action = Await.result(envelope.commitFuture, settings.ackTimeout)
action()
} catch {
case _: TimeoutException =>
val exception = new JmsTxAckTimeout(settings.ackTimeout)
consumerSession.session.rollback()
if (settings.failStreamOnAckTimeout) {
handleError.invoke(exception)
} else {
log.warning(exception.getMessage)
}
} catch {
case e: IllegalArgumentException => handleError.invoke(e) // Invalid envelope. Fail the stage.
case e: jms.JMSException => handleError.invoke(e)
}
})
}
} catch {
case e: IllegalArgumentException => handleError.invoke(e) // Invalid envelope, fail the stage
case e: jms.JMSException => handleError.invoke(e)
}
}
.onComplete(sessionOpenedCB.invoke)

case _ =>
throw new IllegalArgumentException(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was wondering why the compiler wasn't warning this was unreachable... turns out that a null would be the only value which hits this... the new code would NPE on createConsumer which seems to be preferable (you go straight to "why was the argument null").

Scala 3 should complain about this case being unreachable (with the warning saying that it's only reachable in the null case, so recommend explicitly matching null).

"Session must be of type JmsSession, it is a " +
jmsSession.getClass.getName
)
}
})
}
.onComplete(sessionOpenedCB.invoke)
}

}