diff --git a/build.sbt b/build.sbt index cca2385406..0168a5ca74 100644 --- a/build.sbt +++ b/build.sbt @@ -269,7 +269,7 @@ lazy val ironmq = alpakkaProject( Test / fork := true ) -lazy val jms = alpakkaProject("jms", "jms", Dependencies.Jms) +lazy val jms = alpakkaProject("jms", "jms", Dependencies.Jms, Scala3.settings) lazy val jsonStreaming = alpakkaProject("json-streaming", "json.streaming", Dependencies.JsonStreaming) diff --git a/jms/src/main/scala/akka/stream/alpakka/jms/impl/JmsAckSourceStage.scala b/jms/src/main/scala/akka/stream/alpakka/jms/impl/JmsAckSourceStage.scala index 367569e76d..ad95c713e5 100644 --- a/jms/src/main/scala/akka/stream/alpakka/jms/impl/JmsAckSourceStage.scala +++ b/jms/src/main/scala/akka/stream/alpakka/jms/impl/JmsAckSourceStage.scala @@ -32,7 +32,7 @@ private[jms] final class JmsAckSourceStage(settings: JmsConsumerSettings, destin override protected def initialAttributes: Attributes = Attributes.name("JmsAckConsumer") private final class JmsAckSourceStageLogic(inheritedAttributes: Attributes) - extends SourceStageLogic[AckEnvelope](shape, out, settings, destination, inheritedAttributes) { + extends SourceStageLogic[AckEnvelope](shape, out, settings, destination, inheritedAttributes) { self => private val maxPendingAcks = settings.maxPendingAcks private val maxAckInterval = settings.maxAckInterval @@ -40,7 +40,7 @@ private[jms] final class JmsAckSourceStage(settings: JmsConsumerSettings, destin createDestination: jms.Session => javax.jms.Destination): JmsAckSession = { val session = connection.createSession(false, settings.acknowledgeMode.getOrElse(AcknowledgeMode.ClientAcknowledge).mode) - new JmsAckSession(connection, session, createDestination(session), destination, maxPendingAcks) + new JmsAckSession(connection, session, createDestination(session), self.destination, maxPendingAcks) } protected def pushMessage(msg: AckEnvelope): Unit = push(out, msg) diff --git a/jms/src/main/scala/akka/stream/alpakka/jms/impl/JmsConnector.scala b/jms/src/main/scala/akka/stream/alpakka/jms/impl/JmsConnector.scala index 3721edaa2a..755f16a91d 100644 --- a/jms/src/main/scala/akka/stream/alpakka/jms/impl/JmsConnector.scala +++ b/jms/src/main/scala/akka/stream/alpakka/jms/impl/JmsConnector.scala @@ -26,9 +26,7 @@ import scala.util.{Failure, Success, Try} * Internal API. */ @InternalApi -private[jms] trait JmsConnector[S <: JmsSession] { - this: TimerGraphStageLogic with StageLogging => - +private[jms] trait JmsConnector[S <: JmsSession] extends TimerGraphStageLogic with StageLogging { import JmsConnector._ implicit protected var ec: ExecutionContext = _ diff --git a/jms/src/main/scala/akka/stream/alpakka/jms/impl/JmsConsumerStage.scala b/jms/src/main/scala/akka/stream/alpakka/jms/impl/JmsConsumerStage.scala index 3bcf188fc1..7abad113f1 100644 --- a/jms/src/main/scala/akka/stream/alpakka/jms/impl/JmsConsumerStage.scala +++ b/jms/src/main/scala/akka/stream/alpakka/jms/impl/JmsConsumerStage.scala @@ -33,7 +33,7 @@ private[jms] final class JmsConsumerStage(settings: JmsConsumerSettings, destina } private final class JmsConsumerStageLogic(inheritedAttributes: Attributes) - extends SourceStageLogic[jms.Message](shape, out, settings, destination, inheritedAttributes) { + extends SourceStageLogic[jms.Message](shape, out, settings, destination, inheritedAttributes) { self => private val bufferSize = (settings.bufferSize + 1) * settings.sessionCount @@ -43,7 +43,7 @@ private[jms] final class JmsConsumerStage(settings: JmsConsumerSettings, destina createDestination: jms.Session => javax.jms.Destination): JmsConsumerSession = { val session = connection.createSession(false, settings.acknowledgeMode.getOrElse(AcknowledgeMode.AutoAcknowledge).mode) - new JmsConsumerSession(connection, session, createDestination(session), destination) + new JmsConsumerSession(connection, session, createDestination(session), self.destination) } protected def pushMessage(msg: jms.Message): Unit = { diff --git a/jms/src/main/scala/akka/stream/alpakka/jms/impl/JmsTxSourceStage.scala b/jms/src/main/scala/akka/stream/alpakka/jms/impl/JmsTxSourceStage.scala index c74bd62bca..32f964d3ff 100644 --- a/jms/src/main/scala/akka/stream/alpakka/jms/impl/JmsTxSourceStage.scala +++ b/jms/src/main/scala/akka/stream/alpakka/jms/impl/JmsTxSourceStage.scala @@ -33,11 +33,11 @@ private[jms] final class JmsTxSourceStage(settings: JmsConsumerSettings, destina } private final class JmsTxSourceStageLogic(inheritedAttributes: Attributes) - extends SourceStageLogic[TxEnvelope](shape, out, settings, destination, inheritedAttributes) { + extends SourceStageLogic[TxEnvelope](shape, out, settings, destination, inheritedAttributes) { self => protected def createSession(connection: jms.Connection, createDestination: jms.Session => javax.jms.Destination) = { val session = connection.createSession(true, settings.acknowledgeMode.getOrElse(AcknowledgeMode.SessionTransacted).mode) - new JmsConsumerSession(connection, session, createDestination(session), destination) + new JmsConsumerSession(connection, session, createDestination(session), self.destination) } protected def pushMessage(msg: TxEnvelope): Unit = push(out, msg) diff --git a/jms/src/test/scala/akka/stream/alpakka/jms/JmsSpec.scala b/jms/src/test/scala/akka/stream/alpakka/jms/JmsSpec.scala index 7fb653b940..eeef26ce3b 100644 --- a/jms/src/test/scala/akka/stream/alpakka/jms/JmsSpec.scala +++ b/jms/src/test/scala/akka/stream/alpakka/jms/JmsSpec.scala @@ -26,7 +26,7 @@ abstract class JmsSpec with Eventually with LogCapturing { - implicit val system = ActorSystem(this.getClass.getSimpleName) + implicit val system: ActorSystem = ActorSystem(this.getClass.getSimpleName) val consumerConfig = system.settings.config.getConfig(JmsConsumerSettings.configPath) val producerConfig = system.settings.config.getConfig(JmsProducerSettings.configPath)