Skip to content

Commit

Permalink
Rename setting
Browse files Browse the repository at this point in the history
  • Loading branch information
ennru committed Aug 7, 2019
1 parent e1f8ec0 commit f945041
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 13 deletions.
2 changes: 1 addition & 1 deletion jms/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ alpakka.jms {
ack-timeout = 1 second
# For use with transactions, if true the stream fails if Alpakka rolls back the transaction
# when `ack-timeout` is hit.
fail-on-ack-timeout = false
fail-stream-on-ack-timeout = false
}
#consumer

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ final class JmsConsumerSettings private (
val selector: Option[String],
val acknowledgeMode: Option[AcknowledgeMode],
val ackTimeout: scala.concurrent.duration.Duration,
val failOnAckTimeout: Boolean
val failStreamOnAckTimeout: Boolean
) extends akka.stream.alpakka.jms.JmsSettings {

/** Factory to use for creating JMS connections. */
Expand Down Expand Up @@ -76,8 +76,8 @@ final class JmsConsumerSettings private (
/**
* For use with transactions, if true the stream fails if Alpakka rolls back the transaction when `ackTimeout` is hit.
*/
def withFailOnAckTimeout(value: Boolean): JmsConsumerSettings =
if (failOnAckTimeout == value) this else copy(failOnAckTimeout = value)
def withFailStreamOnAckTimeout(value: Boolean): JmsConsumerSettings =
if (failStreamOnAckTimeout == value) this else copy(failStreamOnAckTimeout = value)

private def copy(
connectionFactory: javax.jms.ConnectionFactory = connectionFactory,
Expand All @@ -89,7 +89,7 @@ final class JmsConsumerSettings private (
selector: Option[String] = selector,
acknowledgeMode: Option[AcknowledgeMode] = acknowledgeMode,
ackTimeout: scala.concurrent.duration.Duration = ackTimeout,
failOnAckTimeout: Boolean = failOnAckTimeout
failStreamOnAckTimeout: Boolean = failStreamOnAckTimeout
): JmsConsumerSettings = new JmsConsumerSettings(
connectionFactory = connectionFactory,
connectionRetrySettings = connectionRetrySettings,
Expand All @@ -100,7 +100,7 @@ final class JmsConsumerSettings private (
selector = selector,
acknowledgeMode = acknowledgeMode,
ackTimeout = ackTimeout,
failOnAckTimeout = failOnAckTimeout
failStreamOnAckTimeout = failStreamOnAckTimeout
)

override def toString =
Expand All @@ -114,8 +114,8 @@ final class JmsConsumerSettings private (
s"selector=$selector," +
s"acknowledgeMode=${acknowledgeMode.map(m => AcknowledgeMode.asString(m))}," +
s"ackTimeout=${ackTimeout.toCoarsest}," +
s"failOnAckTimeout=$failOnAckTimeout"
")"
s"failStreamOnAckTimeout=$failStreamOnAckTimeout" +
")"
}

object JmsConsumerSettings {
Expand Down Expand Up @@ -145,7 +145,7 @@ object JmsConsumerSettings {
val acknowledgeMode =
getOption("acknowledge-mode", c => AcknowledgeMode.from(c.getString("acknowledge-mode")))
val ackTimeout = c.getDuration("ack-timeout").asScala
val failOnAckTimeout = c.getBoolean("fail-on-ack-timeout")
val failStreamOnAckTimeout = c.getBoolean("fail-stream-on-ack-timeout")
new JmsConsumerSettings(
connectionFactory,
connectionRetrySettings,
Expand All @@ -156,7 +156,7 @@ object JmsConsumerSettings {
selector,
acknowledgeMode,
ackTimeout,
failOnAckTimeout
failStreamOnAckTimeout
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ private[jms] final class JmsTxSourceStage(settings: JmsConsumerSettings, destina
case _: TimeoutException =>
val exception = new JmsTxAckTimeout(settings.ackTimeout)
session.session.rollback()
if (settings.failOnAckTimeout) {
if (settings.failStreamOnAckTimeout) {
handleError.invoke(exception)
} else {
log.warning(exception.getMessage)
Expand Down
4 changes: 2 additions & 2 deletions jms/src/test/scala/docs/scaladsl/JmsTxConnectorsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ class JmsTxConnectorsSpec extends JmsSpec {
resultList.toSet should contain theSameElementsAs numsIn.map(_.toString)
}

"fail the stream when ack-timeout causes a rollback (and fail-on-ack-timeout is true)" in {
"fail the stream when ack-timeout causes a rollback (and fail-stream-on-ack-timeout is true)" in {
withConnectionFactory() { connectionFactory =>
val jmsSink: Sink[JmsTextMessage, Future[Done]] = JmsProducer.sink(
JmsProducerSettings(producerConfig, connectionFactory).withQueue("numbers")
Expand All @@ -591,7 +591,7 @@ class JmsTxConnectorsSpec extends JmsSpec {
.withSessionCount(5)
.withQueue("numbers")
.withAckTimeout(10.millis)
.withFailOnAckTimeout(true)
.withFailStreamOnAckTimeout(true)
)

val r = new java.util.Random
Expand Down

0 comments on commit f945041

Please sign in to comment.