From 5b8fbfccd748696ccd8527caa2090c5d63afbeef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Pawlik?= Date: Mon, 29 May 2023 10:10:34 +0200 Subject: [PATCH 01/11] rewrite alpakka to pekko --- build.sbt | 5 +++- .../connectors/activemq/JmsConnector.scala | 4 ++-- .../pass4s/connectors/activemq/common.scala | 2 +- .../pass4s/connectors/activemq/consumer.scala | 8 +++---- .../pass4s/connectors/activemq/producer.scala | 6 ++--- .../pass4s/connectors/activemq/taps.scala | 24 +++++++++---------- .../ocadotechnology/pass4s/demo/akka.scala | 2 +- .../connectors/jms/JmsRecoveryTests.scala | 4 ++-- .../pass4s/util/EmbeddedJmsBroker.scala | 2 +- 9 files changed, 30 insertions(+), 27 deletions(-) diff --git a/build.sbt b/build.sbt index ca8154e5..8696fdf1 100644 --- a/build.sbt +++ b/build.sbt @@ -105,8 +105,11 @@ val nettySnykOverrides = Seq( lazy val activemq = module("activemq", directory = "connectors") .settings( name := "pass4s-connector-activemq", + resolvers += "Apache Snapshots" at "https://repository.apache.org/content/repositories/snapshots/", + resolvers ++= Resolver.sonatypeOssRepos("snapshots"), libraryDependencies ++= Seq( - "com.lightbend.akka" %% "akka-stream-alpakka-jms" % "4.0.0", // 5.x.x contains akka-streams +2.7.x which is licensed under BUSL 1.1 + "org.apache.pekko" %% "pekko-connectors-jms" % "0.0.0+99-44451f91-SNAPSHOT", + // "com.lightbend.akka" %% "akka-stream-alpakka-jms" % "4.0.0", // 5.x.x contains akka-streams +2.7.x which is licensed under BUSL 1.1 "org.apache.activemq" % "activemq-pool" % Versions.ActiveMq, "org.typelevel" %% "log4cats-core" % Versions.Log4Cats ), diff --git a/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/JmsConnector.scala b/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/JmsConnector.scala index 82b15343..cf592793 100644 --- a/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/JmsConnector.scala +++ b/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/JmsConnector.scala @@ -16,8 +16,8 @@ package com.ocadotechnology.pass4s.connectors.activemq -import akka.actor.ActorSystem -import akka.stream.{RestartSettings => AkkaRestartSettings} +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.stream.{RestartSettings => AkkaRestartSettings} import cats.effect.Resource import com.ocadotechnology.pass4s.connectors.activemq.JmsSource.JmsSourceSettings import com.ocadotechnology.pass4s.connectors.activemq.consumer._ diff --git a/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/common.scala b/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/common.scala index efa6a58b..d719fe33 100644 --- a/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/common.scala +++ b/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/common.scala @@ -16,7 +16,7 @@ package com.ocadotechnology.pass4s.connectors.activemq -import akka.stream.alpakka.{jms => alpakka} +import org.apache.pekko.stream.connectors.{jms => alpakka} private[activemq] object common { diff --git a/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/consumer.scala b/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/consumer.scala index 140827e6..533f94d4 100644 --- a/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/consumer.scala +++ b/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/consumer.scala @@ -16,10 +16,10 @@ package com.ocadotechnology.pass4s.connectors.activemq -import akka.actor.ActorSystem -import akka.stream.alpakka.jms.scaladsl.JmsConsumer -import akka.stream.alpakka.{jms => alpakka} -import akka.stream.scaladsl.RestartSource +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.stream.connectors.jms.scaladsl.JmsConsumer +import org.apache.pekko.stream.connectors.{jms => alpakka} +import org.apache.pekko.stream.scaladsl.RestartSource import cats.ApplicativeThrow import cats.effect.Async import cats.effect.Sync diff --git a/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/producer.scala b/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/producer.scala index b23e4711..2c241251 100644 --- a/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/producer.scala +++ b/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/producer.scala @@ -16,9 +16,9 @@ package com.ocadotechnology.pass4s.connectors.activemq -import akka.actor.ActorSystem -import akka.stream.alpakka.jms.scaladsl.JmsProducer -import akka.stream.alpakka.{jms => alpakka} +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.stream.connectors.jms.scaladsl.JmsProducer +import org.apache.pekko.stream.connectors.{jms => alpakka} import cats.ApplicativeThrow import cats.effect.Concurrent import cats.effect.Resource diff --git a/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/taps.scala b/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/taps.scala index ed00d6b3..e9dd0141 100644 --- a/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/taps.scala +++ b/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/taps.scala @@ -16,18 +16,18 @@ package com.ocadotechnology.pass4s.connectors.activemq -import akka.stream.FlowShape -import akka.stream.Graph -import akka.stream.Materializer -import akka.stream.OverflowStrategy -import akka.stream.QueueOfferResult -import akka.stream.SourceShape -import akka.stream.StreamDetachedException -import akka.stream.scaladsl.Keep -import akka.stream.scaladsl.Sink -import akka.stream.scaladsl.SinkQueueWithCancel -import akka.stream.scaladsl.Source -import akka.stream.scaladsl.SourceQueueWithComplete +import org.apache.pekko.stream.FlowShape +import org.apache.pekko.stream.Graph +import org.apache.pekko.stream.Materializer +import org.apache.pekko.stream.OverflowStrategy +import org.apache.pekko.stream.QueueOfferResult +import org.apache.pekko.stream.SourceShape +import org.apache.pekko.stream.StreamDetachedException +import org.apache.pekko.stream.scaladsl.Keep +import org.apache.pekko.stream.scaladsl.Sink +import org.apache.pekko.stream.scaladsl.SinkQueueWithCancel +import org.apache.pekko.stream.scaladsl.Source +import org.apache.pekko.stream.scaladsl.SourceQueueWithComplete import cats.effect.Async import cats.effect.kernel.Resource.ExitCase diff --git a/demo/src/main/scala/com/ocadotechnology/pass4s/demo/akka.scala b/demo/src/main/scala/com/ocadotechnology/pass4s/demo/akka.scala index 66ca380b..4165336c 100644 --- a/demo/src/main/scala/com/ocadotechnology/pass4s/demo/akka.scala +++ b/demo/src/main/scala/com/ocadotechnology/pass4s/demo/akka.scala @@ -16,7 +16,7 @@ package com.ocadotechnology.pass4s.demo -import akka.actor.ActorSystem +import org.apache.pekko.actor.ActorSystem import cats.effect.Async import cats.effect.Resource diff --git a/src/it/scala/com/ocadotechnology/pass4s/connectors/jms/JmsRecoveryTests.scala b/src/it/scala/com/ocadotechnology/pass4s/connectors/jms/JmsRecoveryTests.scala index 450f7efb..df0d7338 100644 --- a/src/it/scala/com/ocadotechnology/pass4s/connectors/jms/JmsRecoveryTests.scala +++ b/src/it/scala/com/ocadotechnology/pass4s/connectors/jms/JmsRecoveryTests.scala @@ -1,7 +1,7 @@ package com.ocadotechnology.pass4s.connectors.jms -import akka.actor.ActorSystem -import akka.stream.alpakka.jms.ConnectionRetryException +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.stream.connectors.jms.ConnectionRetryException import cats.effect.IO import cats.effect.Resource import cats.effect.kernel.Deferred diff --git a/src/it/scala/com/ocadotechnology/pass4s/util/EmbeddedJmsBroker.scala b/src/it/scala/com/ocadotechnology/pass4s/util/EmbeddedJmsBroker.scala index 3615f622..46d83a66 100644 --- a/src/it/scala/com/ocadotechnology/pass4s/util/EmbeddedJmsBroker.scala +++ b/src/it/scala/com/ocadotechnology/pass4s/util/EmbeddedJmsBroker.scala @@ -1,6 +1,6 @@ package com.ocadotechnology.pass4s.util -import akka.actor.ActorSystem +import org.apache.pekko.actor.ActorSystem import cats.effect.IO import cats.effect.Resource From 66918925997274e8049b7980c1299a8a506b9b06 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Pawlik?= Date: Mon, 29 May 2023 10:16:23 +0200 Subject: [PATCH 02/11] bump base version --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index 8696fdf1..5afd9b1a 100644 --- a/build.sbt +++ b/build.sbt @@ -1,4 +1,4 @@ -ThisBuild / tlBaseVersion := "0.4" // current series x.y +ThisBuild / tlBaseVersion := "0.5" // current series x.y ThisBuild / organization := "com.ocadotechnology" ThisBuild / organizationName := "Ocado Technology" From 75d4b3e70f9e8e223bc16b8a1e07c5ea669c7efe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Pawlik?= Date: Mon, 29 May 2023 10:26:40 +0200 Subject: [PATCH 03/11] fix integration tests --- .../pass4s/util/LocalStackContainerUtils.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/it/scala/com/ocadotechnology/pass4s/util/LocalStackContainerUtils.scala b/src/it/scala/com/ocadotechnology/pass4s/util/LocalStackContainerUtils.scala index 619031f1..c113685a 100644 --- a/src/it/scala/com/ocadotechnology/pass4s/util/LocalStackContainerUtils.scala +++ b/src/it/scala/com/ocadotechnology/pass4s/util/LocalStackContainerUtils.scala @@ -29,7 +29,7 @@ import software.amazon.awssdk.services.sqs.model.CreateQueueRequest import software.amazon.awssdk.services.sqs.model.DeleteQueueRequest import software.amazon.awssdk.services.sqs.model.QueueAttributeName -import scala.compat.java8.FutureConverters._ +import scala.jdk.FutureConverters._ import scala.jdk.CollectionConverters._ import scala.util.Random @@ -162,7 +162,7 @@ object LocalStackContainerUtils { ) _ <- kinesisClient.waiter.flatMap { waiter => val describeStreamRequest = DescribeStreamRequest.builder().streamName(sn).build() - IO.fromFuture(IO(waiter.waitUntilStreamExists(describeStreamRequest).toScala)) + IO.fromFuture(IO(waiter.waitUntilStreamExists(describeStreamRequest).asScala)) } } yield sn)(sn => kinesisClient.deleteStream(DeleteStreamRequest.builder().streamName(sn).build()).void) From 3e9f062cbf99087b9fe087f7750d620f173ed69a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Pawlik?= Date: Mon, 29 May 2023 15:52:30 +0200 Subject: [PATCH 04/11] drop alpakka references --- build.sbt | 1 - .../pass4s/connectors/activemq/common.scala | 8 ++++---- .../pass4s/connectors/activemq/consumer.scala | 8 ++++---- .../pass4s/connectors/activemq/producer.scala | 12 ++++++------ 4 files changed, 14 insertions(+), 15 deletions(-) diff --git a/build.sbt b/build.sbt index 5afd9b1a..36f71198 100644 --- a/build.sbt +++ b/build.sbt @@ -109,7 +109,6 @@ lazy val activemq = module("activemq", directory = "connectors") resolvers ++= Resolver.sonatypeOssRepos("snapshots"), libraryDependencies ++= Seq( "org.apache.pekko" %% "pekko-connectors-jms" % "0.0.0+99-44451f91-SNAPSHOT", - // "com.lightbend.akka" %% "akka-stream-alpakka-jms" % "4.0.0", // 5.x.x contains akka-streams +2.7.x which is licensed under BUSL 1.1 "org.apache.activemq" % "activemq-pool" % Versions.ActiveMq, "org.typelevel" %% "log4cats-core" % Versions.Log4Cats ), diff --git a/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/common.scala b/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/common.scala index d719fe33..7a9dc43b 100644 --- a/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/common.scala +++ b/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/common.scala @@ -16,13 +16,13 @@ package com.ocadotechnology.pass4s.connectors.activemq -import org.apache.pekko.stream.connectors.{jms => alpakka} +import org.apache.pekko.stream.connectors.{jms => pekkojms} private[activemq] object common { - def toAlpakkaDestination: (String, Jms.Type) => alpakka.Destination = { - case (name, Jms.Type.Topic) => alpakka.Topic(name) - case (name, Jms.Type.Queue) => alpakka.Queue(name) + def toPekkoDestination: (String, Jms.Type) => pekkojms.Destination = { + case (name, Jms.Type.Topic) => pekkojms.Topic(name) + case (name, Jms.Type.Queue) => pekkojms.Queue(name) } } diff --git a/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/consumer.scala b/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/consumer.scala index 533f94d4..2c7023df 100644 --- a/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/consumer.scala +++ b/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/consumer.scala @@ -18,7 +18,7 @@ package com.ocadotechnology.pass4s.connectors.activemq import org.apache.pekko.actor.ActorSystem import org.apache.pekko.stream.connectors.jms.scaladsl.JmsConsumer -import org.apache.pekko.stream.connectors.{jms => alpakka} +import org.apache.pekko.stream.connectors.{jms => pekkojms} import org.apache.pekko.stream.scaladsl.RestartSource import cats.ApplicativeThrow import cats.effect.Async @@ -48,12 +48,12 @@ private[activemq] object consumer { for { JmsSource(name, sourceType, settings) <- Stream.eval(extractJmsSource[F](source)) - jmsConsumerSettings = alpakka + jmsConsumerSettings = pekkojms .JmsConsumerSettings(as, connectionFactory) .withAckTimeout((settings.messageProcessingTimeout + 1.second) * 1.2) .withSessionCount(settings.parallelSessions) .withFailStreamOnAckTimeout(true) - .withDestination(common.toAlpakkaDestination(name, sourceType)) + .withDestination(common.toPekkoDestination(name, sourceType)) txEnvelope <- RestartSource .withBackoff(settings.restartSettings.toAkka) { () => @@ -72,7 +72,7 @@ private[activemq] object consumer { ) } - private def toCommittableMessage[F[_]: Sync: Logger](txEnvelope: alpakka.TxEnvelope): F[Option[CommittableMessage[F]]] = { + private def toCommittableMessage[F[_]: Sync: Logger](txEnvelope: pekkojms.TxEnvelope): F[Option[CommittableMessage[F]]] = { val commit = Sync[F].delay(txEnvelope.commit()) val rollback = Sync[F].delay(txEnvelope.rollback()) txEnvelope.message match { diff --git a/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/producer.scala b/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/producer.scala index 2c241251..b0d8f634 100644 --- a/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/producer.scala +++ b/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/producer.scala @@ -18,7 +18,7 @@ package com.ocadotechnology.pass4s.connectors.activemq import org.apache.pekko.actor.ActorSystem import org.apache.pekko.stream.connectors.jms.scaladsl.JmsProducer -import org.apache.pekko.stream.connectors.{jms => alpakka} +import org.apache.pekko.stream.connectors.{jms => pekkojms} import cats.ApplicativeThrow import cats.effect.Concurrent import cats.effect.Resource @@ -43,7 +43,7 @@ private[activemq] object producer { private type Attempt = Either[Throwable, Unit] private type Promise[F[_]] = Deferred[F, Attempt] - private type JmsPayload[F[_]] = alpakka.JmsEnvelope[Promise[F]] + private type JmsPayload[F[_]] = pekkojms.JmsEnvelope[Promise[F]] def createMessageProducer[F[_]: Async]( connectionFactory: jms.ConnectionFactory, @@ -65,9 +65,9 @@ private[activemq] object producer { for { jmsDestination <- extractJmsDestination[F](message.destination) promise <- Deferred[F, Attempt] - alpakkaMessage = alpakka.JmsTextMessage(message.payload.text, promise).withProperties(message.payload.metadata) - alpakkaDestination = common.toAlpakkaDestination(jmsDestination.name, jmsDestination.destinationType) - _ <- enqueue(alpakkaMessage.to(alpakkaDestination)) + pekkoMessage = pekkojms.JmsTextMessage(message.payload.text, promise).withProperties(message.payload.metadata) + pekkoDestination = common.toPekkoDestination(jmsDestination.name, jmsDestination.destinationType) + _ <- enqueue(pekkoMessage.to(pekkoDestination)) _ <- promise.get.rethrow } yield () @@ -87,7 +87,7 @@ private[activemq] object producer { * that the promise completion and Ref update are atomic */ Stream.eval((Ref.of[F, Set[JmsPayload[F]]](Set.empty), Semaphore[F](n = 1)).tupled).flatMap { case (inflightMessages, semaphore) => - val jmsProducerSettings = alpakka + val jmsProducerSettings = pekkojms .JmsProducerSettings(as, connectionFactory) .withTopic("Pass4s.Default") // default destination is obligatory, but always overridden From 0b7af2dcbf047a6e8f1a3be9e459b34666b88b8a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Pawlik?= Date: Fri, 4 Aug 2023 14:51:50 +0200 Subject: [PATCH 05/11] introduce pekko-based activemq connector along akka --- build.sbt | 24 ++- .../activemq/ConnectionFactories.scala | 49 ++++++ .../connectors/activemq/JmsConnector.scala | 126 +++++++++++++++ .../pass4s/connectors/activemq/common.scala | 28 ++++ .../pass4s/connectors/activemq/consumer.scala | 98 ++++++++++++ .../pass4s/connectors/activemq/producer.scala | 127 +++++++++++++++ .../pass4s/connectors/activemq/taps.scala | 150 ++++++++++++++++++ .../connectors/activemq/JmsConnector.scala | 4 +- .../pass4s/connectors/activemq/common.scala | 8 +- .../pass4s/connectors/activemq/consumer.scala | 14 +- .../pass4s/connectors/activemq/producer.scala | 16 +- .../pass4s/connectors/activemq/taps.scala | 24 +-- .../pass4s/demo/DemoMain.scala | 2 +- .../pass4s/demo/{akka.scala => pekko.scala} | 2 +- 14 files changed, 631 insertions(+), 41 deletions(-) create mode 100644 connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/ConnectionFactories.scala create mode 100644 connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/JmsConnector.scala create mode 100644 connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/common.scala create mode 100644 connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/consumer.scala create mode 100644 connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/producer.scala create mode 100644 connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/taps.scala rename demo/src/main/scala/com/ocadotechnology/pass4s/demo/{akka.scala => pekko.scala} (98%) diff --git a/build.sbt b/build.sbt index 36f71198..a47f064d 100644 --- a/build.sbt +++ b/build.sbt @@ -60,8 +60,8 @@ lazy val root = (project in file(".")) IntegrationTest / classDirectory := (Test / classDirectory).value, IntegrationTest / parallelExecution := true ) - .aggregate(core, kernel, high, activemq, kinesis, sns, sqs, circe, phobos, plaintext, extra, logging, demo, s3Proxy) - .dependsOn(high, activemq, kinesis, sns, sqs, circe, logging, extra, s3Proxy) + .aggregate(core, kernel, high, activemqAkka, activemqPekko, kinesis, sns, sqs, circe, phobos, plaintext, extra, logging, demo, s3Proxy) + .dependsOn(high, activemqAkka, activemqPekko, kinesis, sns, sqs, circe, logging, extra, s3Proxy) def module(name: String, directory: String = ".") = Project(s"pass4s-$name", file(directory) / name).settings(commonSettings) @@ -102,13 +102,25 @@ val nettySnykOverrides = Seq( "io.netty" % "netty-handler" % nettyVersion ) -lazy val activemq = module("activemq", directory = "connectors") +lazy val activemqAkka = module("activemq", directory = "connectors") + .settings( + name := "pass4s-connector-activemq", + libraryDependencies ++= Seq( + "com.lightbend.akka" %% "akka-stream-alpakka-jms" % "4.0.0", // 5.x.x contains akka-streams +2.7.x which is licensed under BUSL 1.1 + "org.apache.activemq" % "activemq-pool" % Versions.ActiveMq, + "org.typelevel" %% "log4cats-core" % Versions.Log4Cats + ), + headerSources / excludeFilter := HiddenFileFilter || "taps.scala" + ) + .dependsOn(core) + +lazy val activemqPekko = module("activemq-pekko", directory = "connectors") .settings( name := "pass4s-connector-activemq", resolvers += "Apache Snapshots" at "https://repository.apache.org/content/repositories/snapshots/", resolvers ++= Resolver.sonatypeOssRepos("snapshots"), libraryDependencies ++= Seq( - "org.apache.pekko" %% "pekko-connectors-jms" % "0.0.0+99-44451f91-SNAPSHOT", + "org.apache.pekko" %% "pekko-connectors-jms" % "0.0.0+140-7d704044-SNAPSHOT", // TODO to be changed to stable release once https://github.com/apache/incubator-pekko-connectors/issues/210 is ready "org.apache.activemq" % "activemq-pool" % Versions.ActiveMq, "org.typelevel" %% "log4cats-core" % Versions.Log4Cats ), @@ -208,7 +220,7 @@ lazy val docs = project // new documentation project WorkflowStep.Sbt(List("docs/mdoc")) ) ) - .dependsOn(high, activemq, kinesis, sns, sqs, circe, logging, extra, s3Proxy) + .dependsOn(high, activemqAkka, activemqPekko, kinesis, sns, sqs, circe, logging, extra, s3Proxy) .enablePlugins(MdocPlugin, DocusaurusPlugin) // misc @@ -225,7 +237,7 @@ lazy val demo = module("demo") "ch.qos.logback" % "logback-classic" % Versions.Logback ) ) - .dependsOn(activemq, sns, sqs, extra, logging) + .dependsOn(activemqPekko, sns, sqs, extra, logging) lazy val commonSettings = Seq( organization := "com.ocadotechnology", diff --git a/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/ConnectionFactories.scala b/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/ConnectionFactories.scala new file mode 100644 index 00000000..b6254811 --- /dev/null +++ b/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/ConnectionFactories.scala @@ -0,0 +1,49 @@ +/* + * Copyright 2023 Ocado Technology + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ocadotechnology.pass4s.connectors.activemq + +import cats.effect.Resource +import cats.effect.Sync +import cats.implicits._ +import org.apache.activemq.ActiveMQConnectionFactory +import org.apache.activemq.pool.PooledConnectionFactory + +object ConnectionFactories { + + /** Creates a pooled ActiveMQ connection factory. + * + * Use `failover:(tcp://address)` to be sure that send operation is never failing. (Connection retries are handled by connection factory) + * Read documentation: https://activemq.apache.org/failover-transport-reference.html + * + * Use raw `tcp://address` to make send operation able to fail. This may be useful when working with outbox pattern + */ + def pooled[F[_]: Sync](username: String, password: String, url: String): Resource[F, PooledConnectionFactory] = + Resource.suspend { + unpooled[F](username, password, url).map(makePooled[F](_)) + } + + /** Creates an ActiveMQ connection factory. + */ + def unpooled[F[_]: Sync](username: String, password: String, url: String): F[ActiveMQConnectionFactory] = + Sync[F].delay(new ActiveMQConnectionFactory(username, password, url)) + + /** Wraps the base factory in a connection pool. + */ + def makePooled[F[_]: Sync](baseFactory: ActiveMQConnectionFactory): Resource[F, PooledConnectionFactory] = + Resource.make(Sync[F].delay(new PooledConnectionFactory(baseFactory)))(pcf => Sync[F].delay(pcf.stop())) + +} diff --git a/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/JmsConnector.scala b/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/JmsConnector.scala new file mode 100644 index 00000000..cf592793 --- /dev/null +++ b/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/JmsConnector.scala @@ -0,0 +1,126 @@ +/* + * Copyright 2023 Ocado Technology + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ocadotechnology.pass4s.connectors.activemq + +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.stream.{RestartSettings => AkkaRestartSettings} +import cats.effect.Resource +import com.ocadotechnology.pass4s.connectors.activemq.JmsSource.JmsSourceSettings +import com.ocadotechnology.pass4s.connectors.activemq.consumer._ +import com.ocadotechnology.pass4s.connectors.activemq.producer._ +import com.ocadotechnology.pass4s.core.CommittableMessage +import com.ocadotechnology.pass4s.core.Connector +import com.ocadotechnology.pass4s.core.Destination +import com.ocadotechnology.pass4s.core.Message +import com.ocadotechnology.pass4s.core.Source +import fs2.Stream +import org.typelevel.log4cats.Logger + +import javax.jms.ConnectionFactory +import scala.concurrent.duration._ +import scala.reflect.runtime.universe._ +import cats.effect.kernel.Async + +trait Jms + +object Jms { + sealed trait Type extends Product with Serializable + + object Type { + final case object Queue extends Type + final case object Topic extends Type + } + +} + +final case class JmsSource private (name: String, sourceType: Jms.Type, settings: JmsSourceSettings) extends Source[Jms] { + override val capability: Type = typeOf[Jms] + + override val messageProcessingTimeout: Option[FiniteDuration] = Some(settings.messageProcessingTimeout) + override val cancelableMessageProcessing: Boolean = settings.cancelableMessageProcessing + override val maxConcurrent: Int = settings.parallelSessions + + def toDestination: JmsDestination = JmsDestination(name, sourceType) +} + +object JmsSource { + + final case class JmsSourceSettings( + // sets internal timeout on a message processing. JMS' ackTimeout will be (x + 1 second) * 1.2 + messageProcessingTimeout: FiniteDuration = 30.seconds, + cancelableMessageProcessing: Boolean = true, + parallelSessions: Int = 1, + restartSettings: RestartSettings = RestartSettings(minBackoff = 2.second, maxBackoff = 30.seconds, randomFactor = 0.2) + ) + + final case class RestartSettings(minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double) { + val toAkka: AkkaRestartSettings = AkkaRestartSettings(minBackoff, maxBackoff, randomFactor) + } + + def queue(name: String, settings: JmsSourceSettings = JmsSourceSettings()): JmsSource = JmsSource(name, Jms.Type.Queue, settings) + + def topic(name: String, settings: JmsSourceSettings = JmsSourceSettings()): JmsSource = JmsSource(name, Jms.Type.Topic, settings) +} + +final case class JmsDestination private (name: String, destinationType: Jms.Type) extends Destination[Jms] { + override val capability: Type = typeOf[Jms] + + def toSource(settings: JmsSourceSettings = JmsSourceSettings()): JmsSource = JmsSource(name, destinationType, settings) +} + +object JmsDestination { + def queue(name: String): JmsDestination = JmsDestination(name, Jms.Type.Queue) + + def topic(name: String): JmsDestination = JmsDestination(name, Jms.Type.Topic) +} + +object JmsConnector { + type JmsConnector[F[_]] = Connector.Aux[F, Jms, ConnectionFactory] + + // these might have to return resources, + // we might also have variants that build an Egress directly or have a conversion method on Connector + // (probably not, as methods on Connector shouldn't be used by end users) + def singleBroker[F[_]: Logger: Async]( + username: String, + password: String, + url: String + )( + implicit as: ActorSystem + ): Resource[F, JmsConnector[F]] = + ConnectionFactories.pooled(username, password, url).flatMap(singleBroker[F](_)) + + def singleBroker[F[_]: Logger: Async]( + connectionFactory: ConnectionFactory + )( + implicit as: ActorSystem + ): Resource[F, JmsConnector[F]] = + for { + producer <- createMessageProducer(connectionFactory) + } yield new Connector[F, Jms] { + + type Raw = ConnectionFactory + override val underlying: ConnectionFactory = connectionFactory + + override def consumeBatched[R >: Jms](source: Source[R]): Stream[F, List[CommittableMessage[F]]] = + consumeAndReconnectOnErrors(connectionFactory)(source).map(List(_)) + + override def produce[R >: Jms](message: Message[R]): F[Unit] = + producer(message) + + } + +} diff --git a/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/common.scala b/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/common.scala new file mode 100644 index 00000000..7a9dc43b --- /dev/null +++ b/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/common.scala @@ -0,0 +1,28 @@ +/* + * Copyright 2023 Ocado Technology + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ocadotechnology.pass4s.connectors.activemq + +import org.apache.pekko.stream.connectors.{jms => pekkojms} + +private[activemq] object common { + + def toPekkoDestination: (String, Jms.Type) => pekkojms.Destination = { + case (name, Jms.Type.Topic) => pekkojms.Topic(name) + case (name, Jms.Type.Queue) => pekkojms.Queue(name) + } + +} diff --git a/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/consumer.scala b/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/consumer.scala new file mode 100644 index 00000000..2c7023df --- /dev/null +++ b/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/consumer.scala @@ -0,0 +1,98 @@ +/* + * Copyright 2023 Ocado Technology + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ocadotechnology.pass4s.connectors.activemq + +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.stream.connectors.jms.scaladsl.JmsConsumer +import org.apache.pekko.stream.connectors.{jms => pekkojms} +import org.apache.pekko.stream.scaladsl.RestartSource +import cats.ApplicativeThrow +import cats.effect.Async +import cats.effect.Sync +import cats.implicits._ +import com.ocadotechnology.pass4s.connectors.activemq.taps._ +import com.ocadotechnology.pass4s.core.Message.Payload +import com.ocadotechnology.pass4s.core.CommittableMessage +import com.ocadotechnology.pass4s.core.Source +import fs2.Stream +import org.typelevel.log4cats.Logger + +import javax.jms +import scala.jdk.CollectionConverters._ +import scala.concurrent.duration._ +import scala.util.Try + +private[activemq] object consumer { + + def consumeAndReconnectOnErrors[F[_]: Async: Logger]( + connectionFactory: jms.ConnectionFactory + )( + source: Source[_] + )( + implicit as: ActorSystem + ): Stream[F, CommittableMessage[F]] = + for { + JmsSource(name, sourceType, settings) <- Stream.eval(extractJmsSource[F](source)) + + jmsConsumerSettings = pekkojms + .JmsConsumerSettings(as, connectionFactory) + .withAckTimeout((settings.messageProcessingTimeout + 1.second) * 1.2) + .withSessionCount(settings.parallelSessions) + .withFailStreamOnAckTimeout(true) + .withDestination(common.toPekkoDestination(name, sourceType)) + + txEnvelope <- RestartSource + .withBackoff(settings.restartSettings.toAkka) { () => + JmsConsumer.txSource(jmsConsumerSettings).named(getClass.getSimpleName) + } + .toStream[F]() + committableMessage <- Stream.eval(toCommittableMessage(txEnvelope)).unNone + } yield committableMessage + + private def extractJmsSource[F[_]: ApplicativeThrow](source: Source[_]): F[JmsSource] = + source match { + case jmsSource: JmsSource => jmsSource.pure[F] + case unsupportedDestination => + ApplicativeThrow[F].raiseError( + new UnsupportedOperationException(s"JmsConnector does not support destination: $unsupportedDestination") + ) + } + + private def toCommittableMessage[F[_]: Sync: Logger](txEnvelope: pekkojms.TxEnvelope): F[Option[CommittableMessage[F]]] = { + val commit = Sync[F].delay(txEnvelope.commit()) + val rollback = Sync[F].delay(txEnvelope.rollback()) + txEnvelope.message match { + case textMessage: jms.TextMessage => + CommittableMessage.instance(Payload(textMessage.getText, getHeaders(textMessage)), commit, _ => rollback).some.pure[F] + case unsupportedMessage => + Logger[F].warn(s"JmsConnector supports only TextMessages. Ignoring received message: $unsupportedMessage") *> rollback.as(None) + } + } + + // fixme: add headers/properties from underlying message - need to double check if all properties are returned by getPropertyNames + private def getHeaders(msg: jms.Message): Map[String, String] = + Try { + msg + .getPropertyNames + .asIterator() + .asInstanceOf[java.util.Iterator[String]] // Please forgive me I have to do this, but underneath it is really String + .asScala + .map(name => name -> msg.getStringProperty(name)) + .toMap + }.getOrElse(Map.empty) + +} diff --git a/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/producer.scala b/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/producer.scala new file mode 100644 index 00000000..b0d8f634 --- /dev/null +++ b/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/producer.scala @@ -0,0 +1,127 @@ +/* + * Copyright 2023 Ocado Technology + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ocadotechnology.pass4s.connectors.activemq + +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.stream.connectors.jms.scaladsl.JmsProducer +import org.apache.pekko.stream.connectors.{jms => pekkojms} +import cats.ApplicativeThrow +import cats.effect.Concurrent +import cats.effect.Resource +import cats.effect.implicits._ +import cats.effect.kernel.Async +import cats.effect.kernel.Deferred +import cats.effect.kernel.Ref +import cats.effect.std.Queue +import cats.effect.std.Semaphore +import cats.implicits._ +import com.ocadotechnology.pass4s.connectors.activemq.taps._ +import com.ocadotechnology.pass4s.core.Destination +import com.ocadotechnology.pass4s.core.Message +import fs2.Pipe +import fs2.Stream + +import javax.jms + +private[activemq] object producer { + + type MessageProducer[F[_]] = Message[_] => F[Unit] + + private type Attempt = Either[Throwable, Unit] + private type Promise[F[_]] = Deferred[F, Attempt] + private type JmsPayload[F[_]] = pekkojms.JmsEnvelope[Promise[F]] + + def createMessageProducer[F[_]: Async]( + connectionFactory: jms.ConnectionFactory, + bufferSize: Int = 100 + )( + implicit as: ActorSystem + ): Resource[F, MessageProducer[F]] = + for { + queue <- Resource.eval(Queue.bounded[F, JmsPayload[F]](bufferSize)) + /** Stream.eval(queue.take) wouldn't work here because it takes only single element and terminates. In this case we need to take all + * elements but one by one as long as there's anything in the queue. Limit is set to one as we only process single message at a time, + * so that we don't reemit chukns in case of failure. + */ + _ <- Stream.fromQueueUnterminated(queue, limit = 1).through(sendMessageAndCompletePromise(connectionFactory)).compile.drain.background + } yield enqueueAndWaitForPromise[F](queue.offer) + + private def enqueueAndWaitForPromise[F[_]: Concurrent](enqueue: JmsPayload[F] => F[Unit]): MessageProducer[F] = + message => + for { + jmsDestination <- extractJmsDestination[F](message.destination) + promise <- Deferred[F, Attempt] + pekkoMessage = pekkojms.JmsTextMessage(message.payload.text, promise).withProperties(message.payload.metadata) + pekkoDestination = common.toPekkoDestination(jmsDestination.name, jmsDestination.destinationType) + _ <- enqueue(pekkoMessage.to(pekkoDestination)) + _ <- promise.get.rethrow + } yield () + + private def sendMessageAndCompletePromise[F[_]: Async]( + connectionFactory: jms.ConnectionFactory + )( + implicit as: ActorSystem + ): Pipe[F, JmsPayload[F], Unit] = { messages => + /* + * Note on `inflightMessages` Ref: + * Every message is added to Ref at the beginning of the message processing: + * - (happy path) after the message is sent the promise of this message is completed and the message is removed from Ref + * - (edge case) when `sendMessagePipe` crashes all inflightMessages are completed with Left and the Ref is cleaned + * + * Note on semaphore: + * Every operation on `inflightMessages` Ref is guarded by single permit uncancelable semaphore to guarantee + * that the promise completion and Ref update are atomic + */ + Stream.eval((Ref.of[F, Set[JmsPayload[F]]](Set.empty), Semaphore[F](n = 1)).tupled).flatMap { case (inflightMessages, semaphore) => + val jmsProducerSettings = pekkojms + .JmsProducerSettings(as, connectionFactory) + .withTopic("Pass4s.Default") // default destination is obligatory, but always overridden + + val sendMessagePipe: Pipe[F, JmsPayload[F], JmsPayload[F]] = + JmsProducer.flexiFlow[Promise[F]](jmsProducerSettings).named(getClass.getSimpleName).toPipe[F]() + + def addMessageToRef(pendingMessage: JmsPayload[F]) = + semaphore.permit.surround(inflightMessages.update(_ + pendingMessage)) + + def completeMessageAndRemoveFromRef(sentMessage: JmsPayload[F]) = + semaphore.permit.surround(sentMessage.passThrough.complete(Right(())).attempt *> inflightMessages.update(_ - sentMessage)) + + def failAllAndCleanRef(ex: Throwable) = + semaphore + .permit + .surround( + inflightMessages.get.flatMap(_.toList.traverse(_.passThrough.complete(Left(ex)).attempt)) *> inflightMessages.set(Set()) + ) + + messages + .evalTap(addMessageToRef) + .through(sendMessagePipe) + .attempts(Stream.constant(jmsProducerSettings.connectionRetrySettings.initialRetry)) + .evalMap(_.fold(failAllAndCleanRef, completeMessageAndRemoveFromRef)) + } + } + + private def extractJmsDestination[F[_]: ApplicativeThrow](destination: Destination[_]): F[JmsDestination] = + destination match { + case jmsDestination: JmsDestination => jmsDestination.pure[F] + case unsupportedDestination => + ApplicativeThrow[F].raiseError( + new UnsupportedOperationException(s"JmsConnector does not support destination: $unsupportedDestination") + ) + } + +} diff --git a/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/taps.scala b/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/taps.scala new file mode 100644 index 00000000..e9dd0141 --- /dev/null +++ b/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/taps.scala @@ -0,0 +1,150 @@ +/* + * Copyright 2021 Martin Krasser + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ocadotechnology.pass4s.connectors.activemq + +import org.apache.pekko.stream.FlowShape +import org.apache.pekko.stream.Graph +import org.apache.pekko.stream.Materializer +import org.apache.pekko.stream.OverflowStrategy +import org.apache.pekko.stream.QueueOfferResult +import org.apache.pekko.stream.SourceShape +import org.apache.pekko.stream.StreamDetachedException +import org.apache.pekko.stream.scaladsl.Keep +import org.apache.pekko.stream.scaladsl.Sink +import org.apache.pekko.stream.scaladsl.SinkQueueWithCancel +import org.apache.pekko.stream.scaladsl.Source +import org.apache.pekko.stream.scaladsl.SourceQueueWithComplete +import cats.effect.Async + +import cats.effect.kernel.Resource.ExitCase +import cats.implicits._ +import fs2.Pipe +import fs2.Stream + +// Copied from https://github.com/krasserm/streamz due to the lack of CE3 support. https://github.com/krasserm/streamz/issues/85 +private[activemq] object taps { + + implicit class AkkaSourceDsl[A, M](source: Graph[SourceShape[A], M]) { + + def toStream[F[_]: Async](onMaterialization: M => Unit = _ => ())(implicit materializer: Materializer): Stream[F, A] = + akkaSourceToFs2Stream(source)(onMaterialization) + } + + implicit class AkkaFlowDsl[A, B, M](flow: Graph[FlowShape[A, B], M]) { + + def toPipe[F[_]: Async]( + onMaterialization: M => Unit = _ => () + )( + implicit materializer: Materializer + ): Pipe[F, A, B] = + akkaFlowToFs2Pipe(flow)(onMaterialization) + + } + + /** Converts an Akka Stream [[Graph]] of [[SourceShape]] to an FS2 [[Stream]]. The [[Graph]] is materialized when the [[Stream]]'s [[F]] + * in run. The materialized value can be obtained with the `onMaterialization` callback. + */ + private def akkaSourceToFs2Stream[F[_], A, M]( + source: Graph[SourceShape[A], M] + )( + onMaterialization: M => Unit + )( + implicit materializer: Materializer, + F: Async[F] + ): Stream[F, A] = + Stream.force { + F.delay { + val (mat, subscriber) = Source.fromGraph(source).toMat(Sink.queue[A]())(Keep.both).run() + onMaterialization(mat) + subscriberStream[F, A](subscriber) + } + } + + /** Converts an Akka Stream [[Graph]] of [[FlowShape]] to an FS2 [[Pipe]]. The [[Graph]] is materialized when the [[Pipe]]'s [[F]] in run. + * The materialized value can be obtained with the `onMaterialization` callback. + */ + private def akkaFlowToFs2Pipe[F[_], A, B, M]( + flow: Graph[FlowShape[A, B], M] + )( + onMaterialization: M => Unit + )( + implicit materializer: Materializer, + F: Async[F] + ): Pipe[F, A, B] = { s => + Stream.force { + F.delay { + val src = Source.queue[A](0, OverflowStrategy.backpressure) + val snk = Sink.queue[B]() + val ((publisher, mat), subscriber) = src.viaMat(flow)(Keep.both).toMat(snk)(Keep.both).run() + onMaterialization(mat) + transformerStream[F, A, B](subscriber, publisher, s) + } + } + } + + private def transformerStream[F[_]: Async, A, B]( + subscriber: SinkQueueWithCancel[B], + publisher: SourceQueueWithComplete[A], + stream: Stream[F, A] + ): Stream[F, B] = + subscriberStream[F, B](subscriber).concurrently(publisherStream[F, A](publisher, stream)) + + private def publisherStream[F[_], A]( + publisher: SourceQueueWithComplete[A], + stream: Stream[F, A] + )( + implicit F: Async[F] + ): Stream[F, Unit] = { + def publish(a: A): F[Option[Unit]] = + Async[F] + .fromFuture(F.delay(publisher.offer(a))) + .flatMap { + case QueueOfferResult.Enqueued => ().some.pure[F] + case QueueOfferResult.Failure(cause) => F.raiseError[Option[Unit]](cause) + case QueueOfferResult.QueueClosed => none[Unit].pure[F] + case QueueOfferResult.Dropped => + F.raiseError[Option[Unit]](new IllegalStateException("This should never happen because we use OverflowStrategy.backpressure")) + } + .recover { + // This handles a race condition between `interruptWhen` and `publish`. + // There's no guarantee that, when the akka sink is terminated, we will observe the + // `interruptWhen` termination before calling publish one last time. + // Such a call fails with StreamDetachedException + case _: StreamDetachedException => none[Unit] + } + + def watchCompletion: F[Unit] = Async[F].fromFuture(F.delay(publisher.watchCompletion())).void + def fail(e: Throwable): F[Unit] = F.delay(publisher.fail(e)) >> watchCompletion + def complete: F[Unit] = F.delay(publisher.complete()) >> watchCompletion + + stream + .interruptWhen(watchCompletion.attempt) + .evalMap(publish) + .unNoneTerminate + .onFinalizeCase { + case ExitCase.Succeeded | ExitCase.Canceled => complete + case ExitCase.Errored(e) => fail(e) + } + } + + private def subscriberStream[F[_], A](subscriber: SinkQueueWithCancel[A])(implicit F: Async[F]): Stream[F, A] = { + val pull = Async[F].fromFuture(F.delay(subscriber.pull())) + val cancel = F.delay(subscriber.cancel()) + Stream.repeatEval(pull).unNoneTerminate.onFinalize(cancel) + } + +} diff --git a/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/JmsConnector.scala b/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/JmsConnector.scala index cf592793..82b15343 100644 --- a/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/JmsConnector.scala +++ b/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/JmsConnector.scala @@ -16,8 +16,8 @@ package com.ocadotechnology.pass4s.connectors.activemq -import org.apache.pekko.actor.ActorSystem -import org.apache.pekko.stream.{RestartSettings => AkkaRestartSettings} +import akka.actor.ActorSystem +import akka.stream.{RestartSettings => AkkaRestartSettings} import cats.effect.Resource import com.ocadotechnology.pass4s.connectors.activemq.JmsSource.JmsSourceSettings import com.ocadotechnology.pass4s.connectors.activemq.consumer._ diff --git a/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/common.scala b/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/common.scala index 7a9dc43b..efa6a58b 100644 --- a/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/common.scala +++ b/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/common.scala @@ -16,13 +16,13 @@ package com.ocadotechnology.pass4s.connectors.activemq -import org.apache.pekko.stream.connectors.{jms => pekkojms} +import akka.stream.alpakka.{jms => alpakka} private[activemq] object common { - def toPekkoDestination: (String, Jms.Type) => pekkojms.Destination = { - case (name, Jms.Type.Topic) => pekkojms.Topic(name) - case (name, Jms.Type.Queue) => pekkojms.Queue(name) + def toAlpakkaDestination: (String, Jms.Type) => alpakka.Destination = { + case (name, Jms.Type.Topic) => alpakka.Topic(name) + case (name, Jms.Type.Queue) => alpakka.Queue(name) } } diff --git a/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/consumer.scala b/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/consumer.scala index 2c7023df..140827e6 100644 --- a/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/consumer.scala +++ b/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/consumer.scala @@ -16,10 +16,10 @@ package com.ocadotechnology.pass4s.connectors.activemq -import org.apache.pekko.actor.ActorSystem -import org.apache.pekko.stream.connectors.jms.scaladsl.JmsConsumer -import org.apache.pekko.stream.connectors.{jms => pekkojms} -import org.apache.pekko.stream.scaladsl.RestartSource +import akka.actor.ActorSystem +import akka.stream.alpakka.jms.scaladsl.JmsConsumer +import akka.stream.alpakka.{jms => alpakka} +import akka.stream.scaladsl.RestartSource import cats.ApplicativeThrow import cats.effect.Async import cats.effect.Sync @@ -48,12 +48,12 @@ private[activemq] object consumer { for { JmsSource(name, sourceType, settings) <- Stream.eval(extractJmsSource[F](source)) - jmsConsumerSettings = pekkojms + jmsConsumerSettings = alpakka .JmsConsumerSettings(as, connectionFactory) .withAckTimeout((settings.messageProcessingTimeout + 1.second) * 1.2) .withSessionCount(settings.parallelSessions) .withFailStreamOnAckTimeout(true) - .withDestination(common.toPekkoDestination(name, sourceType)) + .withDestination(common.toAlpakkaDestination(name, sourceType)) txEnvelope <- RestartSource .withBackoff(settings.restartSettings.toAkka) { () => @@ -72,7 +72,7 @@ private[activemq] object consumer { ) } - private def toCommittableMessage[F[_]: Sync: Logger](txEnvelope: pekkojms.TxEnvelope): F[Option[CommittableMessage[F]]] = { + private def toCommittableMessage[F[_]: Sync: Logger](txEnvelope: alpakka.TxEnvelope): F[Option[CommittableMessage[F]]] = { val commit = Sync[F].delay(txEnvelope.commit()) val rollback = Sync[F].delay(txEnvelope.rollback()) txEnvelope.message match { diff --git a/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/producer.scala b/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/producer.scala index b0d8f634..b23e4711 100644 --- a/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/producer.scala +++ b/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/producer.scala @@ -16,9 +16,9 @@ package com.ocadotechnology.pass4s.connectors.activemq -import org.apache.pekko.actor.ActorSystem -import org.apache.pekko.stream.connectors.jms.scaladsl.JmsProducer -import org.apache.pekko.stream.connectors.{jms => pekkojms} +import akka.actor.ActorSystem +import akka.stream.alpakka.jms.scaladsl.JmsProducer +import akka.stream.alpakka.{jms => alpakka} import cats.ApplicativeThrow import cats.effect.Concurrent import cats.effect.Resource @@ -43,7 +43,7 @@ private[activemq] object producer { private type Attempt = Either[Throwable, Unit] private type Promise[F[_]] = Deferred[F, Attempt] - private type JmsPayload[F[_]] = pekkojms.JmsEnvelope[Promise[F]] + private type JmsPayload[F[_]] = alpakka.JmsEnvelope[Promise[F]] def createMessageProducer[F[_]: Async]( connectionFactory: jms.ConnectionFactory, @@ -65,9 +65,9 @@ private[activemq] object producer { for { jmsDestination <- extractJmsDestination[F](message.destination) promise <- Deferred[F, Attempt] - pekkoMessage = pekkojms.JmsTextMessage(message.payload.text, promise).withProperties(message.payload.metadata) - pekkoDestination = common.toPekkoDestination(jmsDestination.name, jmsDestination.destinationType) - _ <- enqueue(pekkoMessage.to(pekkoDestination)) + alpakkaMessage = alpakka.JmsTextMessage(message.payload.text, promise).withProperties(message.payload.metadata) + alpakkaDestination = common.toAlpakkaDestination(jmsDestination.name, jmsDestination.destinationType) + _ <- enqueue(alpakkaMessage.to(alpakkaDestination)) _ <- promise.get.rethrow } yield () @@ -87,7 +87,7 @@ private[activemq] object producer { * that the promise completion and Ref update are atomic */ Stream.eval((Ref.of[F, Set[JmsPayload[F]]](Set.empty), Semaphore[F](n = 1)).tupled).flatMap { case (inflightMessages, semaphore) => - val jmsProducerSettings = pekkojms + val jmsProducerSettings = alpakka .JmsProducerSettings(as, connectionFactory) .withTopic("Pass4s.Default") // default destination is obligatory, but always overridden diff --git a/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/taps.scala b/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/taps.scala index e9dd0141..ed00d6b3 100644 --- a/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/taps.scala +++ b/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/taps.scala @@ -16,18 +16,18 @@ package com.ocadotechnology.pass4s.connectors.activemq -import org.apache.pekko.stream.FlowShape -import org.apache.pekko.stream.Graph -import org.apache.pekko.stream.Materializer -import org.apache.pekko.stream.OverflowStrategy -import org.apache.pekko.stream.QueueOfferResult -import org.apache.pekko.stream.SourceShape -import org.apache.pekko.stream.StreamDetachedException -import org.apache.pekko.stream.scaladsl.Keep -import org.apache.pekko.stream.scaladsl.Sink -import org.apache.pekko.stream.scaladsl.SinkQueueWithCancel -import org.apache.pekko.stream.scaladsl.Source -import org.apache.pekko.stream.scaladsl.SourceQueueWithComplete +import akka.stream.FlowShape +import akka.stream.Graph +import akka.stream.Materializer +import akka.stream.OverflowStrategy +import akka.stream.QueueOfferResult +import akka.stream.SourceShape +import akka.stream.StreamDetachedException +import akka.stream.scaladsl.Keep +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.SinkQueueWithCancel +import akka.stream.scaladsl.Source +import akka.stream.scaladsl.SourceQueueWithComplete import cats.effect.Async import cats.effect.kernel.Resource.ExitCase diff --git a/demo/src/main/scala/com/ocadotechnology/pass4s/demo/DemoMain.scala b/demo/src/main/scala/com/ocadotechnology/pass4s/demo/DemoMain.scala index 265cbe61..8f8bf5f9 100644 --- a/demo/src/main/scala/com/ocadotechnology/pass4s/demo/DemoMain.scala +++ b/demo/src/main/scala/com/ocadotechnology/pass4s/demo/DemoMain.scala @@ -92,7 +92,7 @@ object DemoMain extends IOApp { // // - val brokerResource = Akka + val brokerResource = Pekko .system[IO] .flatMap { implicit sys => implicit val connectorLogger: Logger[IO] = Slf4jLogger.getLoggerFromClass[IO](classOf[Connector[IO, Jms]]) diff --git a/demo/src/main/scala/com/ocadotechnology/pass4s/demo/akka.scala b/demo/src/main/scala/com/ocadotechnology/pass4s/demo/pekko.scala similarity index 98% rename from demo/src/main/scala/com/ocadotechnology/pass4s/demo/akka.scala rename to demo/src/main/scala/com/ocadotechnology/pass4s/demo/pekko.scala index 4165336c..e72619f5 100644 --- a/demo/src/main/scala/com/ocadotechnology/pass4s/demo/akka.scala +++ b/demo/src/main/scala/com/ocadotechnology/pass4s/demo/pekko.scala @@ -23,7 +23,7 @@ import cats.effect.Resource import cats.effect.Sync import cats.implicits._ -object Akka { +object Pekko { def system[F[_]: Async]: Resource[F, ActorSystem] = Resource.make(Sync[F].delay(ActorSystem()))(sys => Async[F].fromFuture(Sync[F].delay(sys.terminate())).void) } From f16a16abf1d5608aec1cd693f482b196b18b6662 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Pawlik?= Date: Fri, 4 Aug 2023 14:58:46 +0200 Subject: [PATCH 06/11] update workflows --- .github/workflows/ci.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 498d2955..a5e42e93 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -88,11 +88,11 @@ jobs: - name: Make target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') - run: mkdir -p core/target addons/phobos/target target addons/extra/target demo/target addons/plaintext/target connectors/sns/target mdoc/target addons/s3proxy/target connectors/kinesis/target connectors/sqs/target kernel/target addons/logging/target connectors/activemq/target addons/circe/target high/target project/target + run: mkdir -p core/target addons/phobos/target target addons/extra/target demo/target addons/plaintext/target connectors/activemq-pekko/target connectors/sns/target mdoc/target addons/s3proxy/target connectors/kinesis/target connectors/sqs/target kernel/target addons/logging/target connectors/activemq/target addons/circe/target high/target project/target - name: Compress target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') - run: tar cf targets.tar core/target addons/phobos/target target addons/extra/target demo/target addons/plaintext/target connectors/sns/target mdoc/target addons/s3proxy/target connectors/kinesis/target connectors/sqs/target kernel/target addons/logging/target connectors/activemq/target addons/circe/target high/target project/target + run: tar cf targets.tar core/target addons/phobos/target target addons/extra/target demo/target addons/plaintext/target connectors/activemq-pekko/target connectors/sns/target mdoc/target addons/s3proxy/target connectors/kinesis/target connectors/sqs/target kernel/target addons/logging/target connectors/activemq/target addons/circe/target high/target project/target - name: Upload target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') From 00ee0c4c3789dca33eb505e421725d28350916f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Pawlik?= Date: Mon, 7 Aug 2023 08:59:35 +0200 Subject: [PATCH 07/11] repackage pekko module, fix sbt module naming, make test rely on akka implementation --- build.sbt | 2 +- .../pass4s/connectors/activemq/ConnectionFactories.scala | 2 +- .../pass4s/connectors/activemq/JmsConnector.scala | 8 ++++---- .../pass4s/connectors/activemq/common.scala | 2 +- .../pass4s/connectors/activemq/consumer.scala | 4 ++-- .../pass4s/connectors/activemq/producer.scala | 4 ++-- .../ocadotechnology/pass4s/connectors/activemq/taps.scala | 2 +- .../scala/com/ocadotechnology/pass4s/demo/DemoMain.scala | 4 ++-- .../pass4s/connectors/jms/JmsRecoveryTests.scala | 4 ++-- .../ocadotechnology/pass4s/util/EmbeddedJmsBroker.scala | 3 +-- 10 files changed, 17 insertions(+), 18 deletions(-) diff --git a/build.sbt b/build.sbt index a47f064d..f4913953 100644 --- a/build.sbt +++ b/build.sbt @@ -116,7 +116,7 @@ lazy val activemqAkka = module("activemq", directory = "connectors") lazy val activemqPekko = module("activemq-pekko", directory = "connectors") .settings( - name := "pass4s-connector-activemq", + name := "pass4s-connector-pekko-activemq", resolvers += "Apache Snapshots" at "https://repository.apache.org/content/repositories/snapshots/", resolvers ++= Resolver.sonatypeOssRepos("snapshots"), libraryDependencies ++= Seq( diff --git a/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/ConnectionFactories.scala b/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/ConnectionFactories.scala index b6254811..4feb6892 100644 --- a/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/ConnectionFactories.scala +++ b/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/ConnectionFactories.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.ocadotechnology.pass4s.connectors.activemq +package com.ocadotechnology.pass4s.connectors.pekko.activemq import cats.effect.Resource import cats.effect.Sync diff --git a/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/JmsConnector.scala b/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/JmsConnector.scala index cf592793..6a9152e3 100644 --- a/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/JmsConnector.scala +++ b/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/JmsConnector.scala @@ -14,14 +14,14 @@ * limitations under the License. */ -package com.ocadotechnology.pass4s.connectors.activemq +package com.ocadotechnology.pass4s.connectors.pekko.activemq import org.apache.pekko.actor.ActorSystem import org.apache.pekko.stream.{RestartSettings => AkkaRestartSettings} import cats.effect.Resource -import com.ocadotechnology.pass4s.connectors.activemq.JmsSource.JmsSourceSettings -import com.ocadotechnology.pass4s.connectors.activemq.consumer._ -import com.ocadotechnology.pass4s.connectors.activemq.producer._ +import com.ocadotechnology.pass4s.connectors.pekko.activemq.JmsSource.JmsSourceSettings +import com.ocadotechnology.pass4s.connectors.pekko.activemq.consumer._ +import com.ocadotechnology.pass4s.connectors.pekko.activemq.producer._ import com.ocadotechnology.pass4s.core.CommittableMessage import com.ocadotechnology.pass4s.core.Connector import com.ocadotechnology.pass4s.core.Destination diff --git a/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/common.scala b/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/common.scala index 7a9dc43b..dc818c3b 100644 --- a/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/common.scala +++ b/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/common.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.ocadotechnology.pass4s.connectors.activemq +package com.ocadotechnology.pass4s.connectors.pekko.activemq import org.apache.pekko.stream.connectors.{jms => pekkojms} diff --git a/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/consumer.scala b/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/consumer.scala index 2c7023df..1a8ab6e0 100644 --- a/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/consumer.scala +++ b/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/consumer.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.ocadotechnology.pass4s.connectors.activemq +package com.ocadotechnology.pass4s.connectors.pekko.activemq import org.apache.pekko.actor.ActorSystem import org.apache.pekko.stream.connectors.jms.scaladsl.JmsConsumer @@ -24,7 +24,7 @@ import cats.ApplicativeThrow import cats.effect.Async import cats.effect.Sync import cats.implicits._ -import com.ocadotechnology.pass4s.connectors.activemq.taps._ +import com.ocadotechnology.pass4s.connectors.pekko.activemq.taps._ import com.ocadotechnology.pass4s.core.Message.Payload import com.ocadotechnology.pass4s.core.CommittableMessage import com.ocadotechnology.pass4s.core.Source diff --git a/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/producer.scala b/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/producer.scala index b0d8f634..26d9fce8 100644 --- a/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/producer.scala +++ b/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/producer.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.ocadotechnology.pass4s.connectors.activemq +package com.ocadotechnology.pass4s.connectors.pekko.activemq import org.apache.pekko.actor.ActorSystem import org.apache.pekko.stream.connectors.jms.scaladsl.JmsProducer @@ -29,7 +29,7 @@ import cats.effect.kernel.Ref import cats.effect.std.Queue import cats.effect.std.Semaphore import cats.implicits._ -import com.ocadotechnology.pass4s.connectors.activemq.taps._ +import com.ocadotechnology.pass4s.connectors.pekko.activemq.taps._ import com.ocadotechnology.pass4s.core.Destination import com.ocadotechnology.pass4s.core.Message import fs2.Pipe diff --git a/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/taps.scala b/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/taps.scala index e9dd0141..4a6260aa 100644 --- a/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/taps.scala +++ b/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/taps.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.ocadotechnology.pass4s.connectors.activemq +package com.ocadotechnology.pass4s.connectors.pekko.activemq import org.apache.pekko.stream.FlowShape import org.apache.pekko.stream.Graph diff --git a/demo/src/main/scala/com/ocadotechnology/pass4s/demo/DemoMain.scala b/demo/src/main/scala/com/ocadotechnology/pass4s/demo/DemoMain.scala index 8f8bf5f9..8958c10d 100644 --- a/demo/src/main/scala/com/ocadotechnology/pass4s/demo/DemoMain.scala +++ b/demo/src/main/scala/com/ocadotechnology/pass4s/demo/DemoMain.scala @@ -31,8 +31,8 @@ import cats.effect.implicits._ import cats.implicits._ import cats.~> import com.ocadotechnology.pass4s.circe.syntax._ -import com.ocadotechnology.pass4s.connectors.activemq.Jms -import com.ocadotechnology.pass4s.connectors.activemq.JmsConnector +import com.ocadotechnology.pass4s.connectors.pekko.activemq.Jms +import com.ocadotechnology.pass4s.connectors.pekko.activemq.JmsConnector import com.ocadotechnology.pass4s.core._ import com.ocadotechnology.pass4s.extra.MessageProcessor import com.ocadotechnology.pass4s.high._ diff --git a/src/it/scala/com/ocadotechnology/pass4s/connectors/jms/JmsRecoveryTests.scala b/src/it/scala/com/ocadotechnology/pass4s/connectors/jms/JmsRecoveryTests.scala index df0d7338..450f7efb 100644 --- a/src/it/scala/com/ocadotechnology/pass4s/connectors/jms/JmsRecoveryTests.scala +++ b/src/it/scala/com/ocadotechnology/pass4s/connectors/jms/JmsRecoveryTests.scala @@ -1,7 +1,7 @@ package com.ocadotechnology.pass4s.connectors.jms -import org.apache.pekko.actor.ActorSystem -import org.apache.pekko.stream.connectors.jms.ConnectionRetryException +import akka.actor.ActorSystem +import akka.stream.alpakka.jms.ConnectionRetryException import cats.effect.IO import cats.effect.Resource import cats.effect.kernel.Deferred diff --git a/src/it/scala/com/ocadotechnology/pass4s/util/EmbeddedJmsBroker.scala b/src/it/scala/com/ocadotechnology/pass4s/util/EmbeddedJmsBroker.scala index 46d83a66..1c279925 100644 --- a/src/it/scala/com/ocadotechnology/pass4s/util/EmbeddedJmsBroker.scala +++ b/src/it/scala/com/ocadotechnology/pass4s/util/EmbeddedJmsBroker.scala @@ -1,7 +1,6 @@ package com.ocadotechnology.pass4s.util -import org.apache.pekko.actor.ActorSystem - +import akka.actor.ActorSystem import cats.effect.IO import cats.effect.Resource import com.ocadotechnology.pass4s.connectors.activemq.JmsConnector From 490bbc355189fbae05979676ca1e474aef502e32 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Pawlik?= Date: Mon, 7 Aug 2023 09:04:38 +0200 Subject: [PATCH 08/11] fix demo module compilation --- .../scala/com/ocadotechnology/pass4s/demo/CirceDemo.scala | 4 ++-- .../scala/com/ocadotechnology/pass4s/demo/destinations.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/demo/src/main/scala/com/ocadotechnology/pass4s/demo/CirceDemo.scala b/demo/src/main/scala/com/ocadotechnology/pass4s/demo/CirceDemo.scala index eb2596c1..2e0b2163 100644 --- a/demo/src/main/scala/com/ocadotechnology/pass4s/demo/CirceDemo.scala +++ b/demo/src/main/scala/com/ocadotechnology/pass4s/demo/CirceDemo.scala @@ -24,8 +24,8 @@ import cats.effect.kernel.Sync import cats.effect.kernel.Temporal import cats.implicits._ import com.ocadotechnology.pass4s.circe.syntax._ -import com.ocadotechnology.pass4s.connectors.activemq.Jms -import com.ocadotechnology.pass4s.connectors.activemq.JmsSource +import com.ocadotechnology.pass4s.connectors.pekko.activemq.Jms +import com.ocadotechnology.pass4s.connectors.pekko.activemq.JmsSource import com.ocadotechnology.pass4s.core.CommittableMessage import com.ocadotechnology.pass4s.core.Connector import com.ocadotechnology.pass4s.core.Message diff --git a/demo/src/main/scala/com/ocadotechnology/pass4s/demo/destinations.scala b/demo/src/main/scala/com/ocadotechnology/pass4s/demo/destinations.scala index 00072dfa..83f14cb7 100644 --- a/demo/src/main/scala/com/ocadotechnology/pass4s/demo/destinations.scala +++ b/demo/src/main/scala/com/ocadotechnology/pass4s/demo/destinations.scala @@ -16,8 +16,8 @@ package com.ocadotechnology.pass4s.demo -import com.ocadotechnology.pass4s.connectors.activemq.JmsDestination -import com.ocadotechnology.pass4s.connectors.activemq.JmsSource +import com.ocadotechnology.pass4s.connectors.pekko.activemq.JmsDestination +import com.ocadotechnology.pass4s.connectors.pekko.activemq.JmsSource object Destinations { val inventoryEvents = JmsSource.queue("Inventory.Events") From 3b1cc5d5f33e391bc35106cc5c1132acde7486a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Pawlik?= Date: Mon, 7 Aug 2023 09:29:04 +0200 Subject: [PATCH 09/11] add experimental notice on pekko implementation --- .../pass4s/connectors/activemq/ConnectionFactories.scala | 3 +++ .../pass4s/connectors/activemq/JmsConnector.scala | 9 ++++++--- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/ConnectionFactories.scala b/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/ConnectionFactories.scala index 4feb6892..cbebad37 100644 --- a/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/ConnectionFactories.scala +++ b/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/ConnectionFactories.scala @@ -22,6 +22,9 @@ import cats.implicits._ import org.apache.activemq.ActiveMQConnectionFactory import org.apache.activemq.pool.PooledConnectionFactory +/** This implementation is EXPERIMENTAL - use at your own risk This module relies on SNAPSHOT version of Pekko that has not been extensively + * tested in production yet + */ object ConnectionFactories { /** Creates a pooled ActiveMQ connection factory. diff --git a/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/JmsConnector.scala b/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/JmsConnector.scala index 6a9152e3..b81ce53c 100644 --- a/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/JmsConnector.scala +++ b/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/JmsConnector.scala @@ -91,9 +91,9 @@ object JmsDestination { object JmsConnector { type JmsConnector[F[_]] = Connector.Aux[F, Jms, ConnectionFactory] - // these might have to return resources, - // we might also have variants that build an Egress directly or have a conversion method on Connector - // (probably not, as methods on Connector shouldn't be used by end users) + /** This implementation is EXPERIMENTAL - use at your own risk This module relies on SNAPSHOT version of Pekko that has not been + * extensively tested in production yet + */ def singleBroker[F[_]: Logger: Async]( username: String, password: String, @@ -103,6 +103,9 @@ object JmsConnector { ): Resource[F, JmsConnector[F]] = ConnectionFactories.pooled(username, password, url).flatMap(singleBroker[F](_)) + /** This implementation is EXPERIMENTAL - use at your own risk This module relies on SNAPSHOT version of Pekko that has not been + * extensively tested in production yet + */ def singleBroker[F[_]: Logger: Async]( connectionFactory: ConnectionFactory )( From 8ce79627ea937ed48526e63c1f956d6771ef9b7e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Pawlik?= Date: Mon, 7 Aug 2023 11:45:44 +0200 Subject: [PATCH 10/11] restore base version, since this change is compatible --- build.sbt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index f4913953..03e1fa6f 100644 --- a/build.sbt +++ b/build.sbt @@ -1,4 +1,4 @@ -ThisBuild / tlBaseVersion := "0.5" // current series x.y +ThisBuild / tlBaseVersion := "0.4" // current series x.y ThisBuild / organization := "com.ocadotechnology" ThisBuild / organizationName := "Ocado Technology" @@ -116,6 +116,7 @@ lazy val activemqAkka = module("activemq", directory = "connectors") lazy val activemqPekko = module("activemq-pekko", directory = "connectors") .settings( + mimaPreviousArtifacts := Set(), // Remove when 0.4.2 is released name := "pass4s-connector-pekko-activemq", resolvers += "Apache Snapshots" at "https://repository.apache.org/content/repositories/snapshots/", resolvers ++= Resolver.sonatypeOssRepos("snapshots"), From d212b57bb628730bd9d58b0d7dbe0067d6ae799f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Pawlik?= Date: Mon, 7 Aug 2023 12:53:17 +0200 Subject: [PATCH 11/11] comment in build.sbt, mention pekko in docs --- build.sbt | 2 +- docs/getting-started.md | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/build.sbt b/build.sbt index 03e1fa6f..fe3a4a44 100644 --- a/build.sbt +++ b/build.sbt @@ -118,7 +118,7 @@ lazy val activemqPekko = module("activemq-pekko", directory = "connectors") .settings( mimaPreviousArtifacts := Set(), // Remove when 0.4.2 is released name := "pass4s-connector-pekko-activemq", - resolvers += "Apache Snapshots" at "https://repository.apache.org/content/repositories/snapshots/", + resolvers += "Apache Snapshots" at "https://repository.apache.org/content/repositories/snapshots/", // Resolvers to be removed when stable version is released resolvers ++= Resolver.sonatypeOssRepos("snapshots"), libraryDependencies ++= Seq( "org.apache.pekko" %% "pekko-connectors-jms" % "0.0.0+140-7d704044-SNAPSHOT", // TODO to be changed to stable release once https://github.com/apache/incubator-pekko-connectors/issues/210 is ready diff --git a/docs/getting-started.md b/docs/getting-started.md index 9c1e3109..bf7cdae4 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -30,9 +30,13 @@ The library is divided into multiple modules. If you're only interested in the b [ActiveMq](https://activemq.apache.org/) ```scala -// ActiveMQ connector +// ActiveMQ connector - based on Akka Alpakka "com.ocadotechnology" %% "pass4s-connector-activemq" % "@VERSION@" +// ActiveMQ pekko connector - based on Pekko Connectors +"com.ocadotechnology" %% "pass4s-connector-pekko-activemq" % "@VERSION@" ``` +⚠️ **Warning** Pekko connector is an experimental addition at the moment, as it is based on nightly build + [SNS/SQS](https://aws.amazon.com/blogs/aws/queues-and-notifications-now-best-friends/) ```scala