Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jms Cross Compile Scala 3 #3009

Merged
merged 2 commits into from
Sep 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ lazy val ironmq = alpakkaProject(
Test / fork := true
)

lazy val jms = alpakkaProject("jms", "jms", Dependencies.Jms)
lazy val jms = alpakkaProject("jms", "jms", Dependencies.Jms, Scala3.settings)

lazy val jsonStreaming = alpakkaProject("json-streaming", "json.streaming", Dependencies.JsonStreaming)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ private[jms] final class JmsAckSourceStage(settings: JmsConsumerSettings, destin
override protected def initialAttributes: Attributes = Attributes.name("JmsAckConsumer")

private final class JmsAckSourceStageLogic(inheritedAttributes: Attributes)
extends SourceStageLogic[AckEnvelope](shape, out, settings, destination, inheritedAttributes) {
extends SourceStageLogic[AckEnvelope](shape, out, settings, destination, inheritedAttributes) { self =>
private val maxPendingAcks = settings.maxPendingAcks
private val maxAckInterval = settings.maxAckInterval

protected def createSession(connection: jms.Connection,
createDestination: jms.Session => javax.jms.Destination): JmsAckSession = {
val session =
connection.createSession(false, settings.acknowledgeMode.getOrElse(AcknowledgeMode.ClientAcknowledge).mode)
new JmsAckSession(connection, session, createDestination(session), destination, maxPendingAcks)
new JmsAckSession(connection, session, createDestination(session), self.destination, maxPendingAcks)
}

protected def pushMessage(msg: AckEnvelope): Unit = push(out, msg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ import scala.util.{Failure, Success, Try}
* Internal API.
*/
@InternalApi
private[jms] trait JmsConnector[S <: JmsSession] {
this: TimerGraphStageLogic with StageLogging =>

private[jms] trait JmsConnector[S <: JmsSession] extends TimerGraphStageLogic with StageLogging {
import JmsConnector._

implicit protected var ec: ExecutionContext = _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ private[jms] final class JmsConsumerStage(settings: JmsConsumerSettings, destina
}

private final class JmsConsumerStageLogic(inheritedAttributes: Attributes)
extends SourceStageLogic[jms.Message](shape, out, settings, destination, inheritedAttributes) {
extends SourceStageLogic[jms.Message](shape, out, settings, destination, inheritedAttributes) { self =>

private val bufferSize = (settings.bufferSize + 1) * settings.sessionCount

Expand All @@ -43,7 +43,7 @@ private[jms] final class JmsConsumerStage(settings: JmsConsumerSettings, destina
createDestination: jms.Session => javax.jms.Destination): JmsConsumerSession = {
val session =
connection.createSession(false, settings.acknowledgeMode.getOrElse(AcknowledgeMode.AutoAcknowledge).mode)
new JmsConsumerSession(connection, session, createDestination(session), destination)
new JmsConsumerSession(connection, session, createDestination(session), self.destination)
}

protected def pushMessage(msg: jms.Message): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ private[jms] final class JmsTxSourceStage(settings: JmsConsumerSettings, destina
}

private final class JmsTxSourceStageLogic(inheritedAttributes: Attributes)
extends SourceStageLogic[TxEnvelope](shape, out, settings, destination, inheritedAttributes) {
extends SourceStageLogic[TxEnvelope](shape, out, settings, destination, inheritedAttributes) { self =>
protected def createSession(connection: jms.Connection, createDestination: jms.Session => javax.jms.Destination) = {
val session =
connection.createSession(true, settings.acknowledgeMode.getOrElse(AcknowledgeMode.SessionTransacted).mode)
new JmsConsumerSession(connection, session, createDestination(session), destination)
new JmsConsumerSession(connection, session, createDestination(session), self.destination)
}

protected def pushMessage(msg: TxEnvelope): Unit = push(out, msg)
Expand Down
2 changes: 1 addition & 1 deletion jms/src/test/scala/akka/stream/alpakka/jms/JmsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ abstract class JmsSpec
with Eventually
with LogCapturing {

implicit val system = ActorSystem(this.getClass.getSimpleName)
implicit val system: ActorSystem = ActorSystem(this.getClass.getSimpleName)

val consumerConfig = system.settings.config.getConfig(JmsConsumerSettings.configPath)
val producerConfig = system.settings.config.getConfig(JmsProducerSettings.configPath)
Expand Down