Skip to content

Commit

Permalink
extensive rewrite of tests to use ActiveMQ Artemis
Browse files Browse the repository at this point in the history
* because support for Jakarta Messaging was addd in ActiveMQ 6.0,
  which requires JDK 17
* using Artermis EmbeddedActiveMQ broker
  • Loading branch information
patriknw committed Nov 30, 2023
1 parent 9fbf41f commit 139f1f2
Show file tree
Hide file tree
Showing 20 changed files with 121 additions and 185 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ private[jakartajms] final class JmsConsumerStage(settings: JmsConsumerSettings,
createDestination: jms.Session => jakarta.jms.Destination): JmsConsumerSession = {
val session =
connection.createSession(false, settings.acknowledgeMode.getOrElse(AcknowledgeMode.AutoAcknowledge).mode)

new JmsConsumerSession(connection, session, createDestination(session), self.destination)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import akka.stream.javadsl.Source;
import akka.testkit.javadsl.TestKit;
import com.typesafe.config.Config;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.artemis.jms.client.ActiveMQQueue;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import akka.testkit.javadsl.TestKit;
import jakartajmstestkit.JmsBroker;
import com.typesafe.config.Config;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.artemis.jms.client.ActiveMQQueue;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
Expand Down
16 changes: 6 additions & 10 deletions jakarta-jms/src/test/java/docs/javadsl/JmsConnectorsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import akka.testkit.javadsl.TestKit;
import com.typesafe.config.Config;
import jakartajmstestkit.JmsBroker;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQSession;
import org.apache.activemq.artemis.jms.client.ActiveMQQueue;
import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
Expand Down Expand Up @@ -162,8 +162,8 @@ public void publishAndConsumeJmsObjectMessage() throws Exception {
// #object-source
ActiveMQConnectionFactory connectionFactory =
(ActiveMQConnectionFactory) server.createConnectionFactory();
connectionFactory.setTrustedPackages(
Arrays.asList(DummyJavaTests.class.getPackage().getName()));
connectionFactory.setDeserializationWhiteList(
DummyJavaTests.class.getPackage().getName());

// #object-source
// #connection-factory-object
Expand Down Expand Up @@ -658,11 +658,7 @@ public void browse() throws Exception {
result.toCompletableFuture().get().stream()
.map(
message -> {
try {
return ((ActiveMQTextMessage) message).getText();
} catch (JMSException e) {
throw new RuntimeException(e);
}
})
.collect(Collectors.toList());

Expand Down
8 changes: 5 additions & 3 deletions jakarta-jms/src/test/java/docs/javadsl/JmsSettingsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import akka.stream.alpakka.jakartajms.*;
import akka.stream.alpakka.testkit.javadsl.LogCapturingJunit4;
import com.typesafe.config.ConfigFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.junit.Rule;
import org.junit.Test;

Expand Down Expand Up @@ -59,10 +59,11 @@ public void producerSettings() throws Exception {
SendRetrySettings sendRetrySettings2 = SendRetrySettings.create(sendRetryConfig);
assertEquals(sendRetrySettings.toString(), sendRetrySettings2.toString());

String brokerUrl = "vm://0";
// #producer-settings
Config producerConfig = config.getConfig(JmsProducerSettings.configPath());
JmsProducerSettings settings =
JmsProducerSettings.create(producerConfig, new ActiveMQConnectionFactory("broker-url"))
JmsProducerSettings.create(producerConfig, new ActiveMQConnectionFactory(brokerUrl))
.withTopic("target-topic")
.withCredentials(Credentials.create("username", "password"))
.withConnectionRetrySettings(retrySettings)
Expand All @@ -78,10 +79,11 @@ public void consumerSettings() throws Exception {
Config connectionRetryConfig = config.getConfig("alpakka.jakarta-jms.connection-retry");
ConnectionRetrySettings retrySettings = ConnectionRetrySettings.create(connectionRetryConfig);

String brokerUrl = "vm://0";
// #consumer-settings
Config consumerConfig = config.getConfig(JmsConsumerSettings.configPath());
JmsConsumerSettings settings =
JmsConsumerSettings.create(consumerConfig, new ActiveMQConnectionFactory("broker-url"))
JmsConsumerSettings.create(consumerConfig, new ActiveMQConnectionFactory(brokerUrl))
.withTopic("message-topic")
.withCredentials(Credentials.create("username", "password"))
.withConnectionRetrySettings(retrySettings)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import akka.testkit.javadsl.TestKit;
import jakartajmstestkit.JmsBroker;
import com.typesafe.config.Config;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.artemis.jms.client.ActiveMQQueue;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,7 @@ class JmsProducerRetrySpec extends JmsSpec {
server.stop() // crash.

Thread.sleep(1000)
// https://activemq.apache.org/how-do-i-restart-embedded-broker.html
server.service.waitUntilStopped()
server.start(true) // recover.
server.restart() // recover.
val restartTime = System.currentTimeMillis()
for (_ <- 1 to 10) queue.offer(1) // 10 after the crash
queue.complete()
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package akka.stream.alpakka.jakartajms

import scala.util.Random

import akka.actor.ActorSystem
import akka.stream.alpakka.testkit.scaladsl.LogCapturing
import akka.testkit.TestKit
Expand All @@ -14,7 +16,6 @@ import org.scalatest.concurrent.{Eventually, ScalaFutures}
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}

import jakarta.jms._

abstract class JmsSpec
Expand Down Expand Up @@ -78,4 +79,6 @@ abstract class JmsSpec

def withMockedConsumer(test: ConsumerMock => Unit): Unit = test(ConsumerMock())

def createName(prefix: String) = prefix + Random.nextInt().toString

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package akka.stream.alpakka.jakartajms.scaladsl

import jakarta.jms.{Connection, ConnectionFactory}

import org.apache.activemq.ActiveMQConnection
import org.apache.activemq.artemis.jms.client.ActiveMQConnection

/**
* a silly cached connection factory, not thread safe
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ class JmsAckConnectorsSpec extends JmsSpec {
probe.requestNext(convertSpanToDuration(patienceConfig.timeout)) shouldBe Some(aMessage)

eventually {
server.service.checkQueueSize(testQueue) shouldBe true //queue is empty
server.getQueueSize(testQueue) shouldBe 0 //queue is empty
}

consumerControl.shutdown()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,36 @@

package docs.scaladsl

import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit

import scala.annotation.tailrec
import scala.collection.immutable
import scala.collection.mutable
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.Failure
import scala.util.Success

import akka.Done
import akka.stream.KillSwitches
import akka.stream.ThrottleMode
import akka.stream.alpakka.jakartajms._
import akka.stream.alpakka.jakartajms.scaladsl.{JmsConsumer, JmsConsumerControl, JmsProducer}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.stream.alpakka.jakartajms.scaladsl.JmsConsumer
import akka.stream.alpakka.jakartajms.scaladsl.JmsConsumerControl
import akka.stream.alpakka.jakartajms.scaladsl.JmsProducer
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.stream.testkit.scaladsl.TestSink
import akka.stream.{KillSwitches, ThrottleMode}
import org.apache.activemq.ActiveMQSession
import jakarta.jms.{JMSException, TextMessage}
import jakarta.jms.JMSException
import jakarta.jms.TextMessage
import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants
import org.scalatest.Inspectors._
import org.scalatest.time.Span.convertSpanToDuration

import scala.annotation.tailrec
import scala.collection.{immutable, mutable}
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.{Failure, Success}

class JmsBufferedAckConnectorsSpec extends JmsSharedServerSpec {
class JmsBufferedAckConnectorsSpec extends JmsSpec {

override implicit val patienceConfig: PatienceConfig = PatienceConfig(2.minutes)

Expand Down Expand Up @@ -309,7 +319,7 @@ class JmsBufferedAckConnectorsSpec extends JmsSharedServerSpec {
.run()

// We need this ack mode for AMQ to not lose messages as ack normally acks any messages read on the session.
val individualAck = new AcknowledgeMode(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE)
val individualAck = new AcknowledgeMode(ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE)

val jmsSource: Source[AckEnvelope, JmsConsumerControl] = JmsConsumer.ackSource(
JmsConsumerSettings(consumerConfig, connectionFactory)
Expand All @@ -336,8 +346,7 @@ class JmsBufferedAckConnectorsSpec extends JmsSharedServerSpec {
val ex = new Exception("Test exception")
killSwitch.abort(ex)

import scala.concurrent.ExecutionContext.Implicits.global

import system.dispatcher
val resultTry = streamDone.map(Success(_)).recover { case e => Failure(e) }.futureValue

// Keep publishing for another 2 seconds to make sure we killed the consumption mid-stream.
Expand Down Expand Up @@ -377,7 +386,8 @@ class JmsBufferedAckConnectorsSpec extends JmsSharedServerSpec {
resultList.toSet should contain.theSameElementsAs(numsIn.map(_.toString))
}

"send acknowledgments back to the broker after max.ack.interval" in withConnectionFactory() { connectionFactory =>
"send acknowledgments back to the broker after max.ack.interval" in withServer() { server =>
val connectionFactory = server.createTopicConnectionFactory
val testQueue = "test"
val aMessage = "message"
val maxAckInterval = 1.second
Expand Down Expand Up @@ -406,7 +416,7 @@ class JmsBufferedAckConnectorsSpec extends JmsSharedServerSpec {
probe.requestNext(convertSpanToDuration(patienceConfig.timeout)) shouldBe Some(aMessage)

eventually {
isQueueEmpty(testQueue) shouldBe true
server.getQueueSize(testQueue) shouldBe 0
}

consumerControl.shutdown()
Expand Down
27 changes: 17 additions & 10 deletions jakarta-jms/src/test/scala/docs/scaladsl/JmsConnectorsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@ import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.{Done, NotUsed}
import jakarta.jms._

import org.apache.activemq.command.ActiveMQQueue
import org.apache.activemq.{ActiveMQConnectionFactory, ActiveMQSession}
import org.apache.activemq.artemis.jms.client.ActiveMQQueue
import org.apache.activemq.artemis.jms.client.{ActiveMQConnectionFactory, ActiveMQSession}
import org.mockito.ArgumentMatchers._
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer

import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.collection.immutable
import scala.collection.mutable
import scala.concurrent.Future
Expand All @@ -37,13 +36,19 @@ class JmsConnectorsSpec extends JmsSpec {

override implicit val patienceConfig: PatienceConfig = PatienceConfig(2.minutes)

private def assertClosed(connectionFactory: CachedConnectionFactory): Unit = {
intercept[JMSException] {
connectionFactory.cachedConnection.getClientID
}.getMessage shouldBe "Connection is closed"
}

"The JMS Connectors" should {
"publish and consume strings through a queue" in withServer() { server =>
val url = server.brokerUri
//#connection-factory
//#text-sink
//#text-source
val connectionFactory: jakarta.jms.ConnectionFactory = new org.apache.activemq.ActiveMQConnectionFactory(url)
val connectionFactory: jakarta.jms.ConnectionFactory = new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory(url)
//#connection-factory
//#text-sink
//#text-source
Expand Down Expand Up @@ -76,7 +81,7 @@ class JmsConnectorsSpec extends JmsSpec {
//#object-sink
//#object-source
val connectionFactory = connFactory.asInstanceOf[ActiveMQConnectionFactory]
connectionFactory.setTrustedPackages(List(classOf[DummyObject].getPackage.getName).asJava)
connectionFactory.setDeserializationWhiteList(classOf[DummyObject].getPackage.getName)
//#object-sink
//#object-source

Expand Down Expand Up @@ -536,7 +541,9 @@ class JmsConnectorsSpec extends JmsSpec {
val completionFuture: Future[Done] = Source(msgsIn).runWith(jmsSink)
completionFuture.futureValue shouldBe Done
// make sure connection was closed
eventually { connectionFactory.cachedConnection shouldBe Symbol("closed") }
eventually {
assertClosed(connectionFactory)
}
}

"sink exceptional completion" in withConnectionFactory() { connFactory =>
Expand All @@ -554,7 +561,7 @@ class JmsConnectorsSpec extends JmsSpec {

completionFuture.failed.futureValue shouldBe a[RuntimeException]
// make sure connection was closed
eventually { connectionFactory.cachedConnection shouldBe Symbol("closed") }
eventually { assertClosed(connectionFactory) }
}

"producer disconnect exceptional completion" in withServer() { server =>
Expand Down Expand Up @@ -588,7 +595,7 @@ class JmsConnectorsSpec extends JmsSpec {
// - not yet initialized before broker stop, or
// - closed on broker stop (if preStart came first).
if (connectionFactory.cachedConnection != null) {
connectionFactory.cachedConnection shouldBe Symbol("closed")
assertClosed(connectionFactory)
}
}

Expand Down Expand Up @@ -837,7 +844,7 @@ class JmsConnectorsSpec extends JmsSpec {
}

"fail if message destination is not defined" in {
val connectionFactory = new ActiveMQConnectionFactory("localhost:1234")
val connectionFactory = new ActiveMQConnectionFactory("vm://13")

an[IllegalArgumentException] shouldBe thrownBy {
JmsProducer.flow(JmsProducerSettings(producerConfig, connectionFactory))
Expand Down Expand Up @@ -1170,7 +1177,7 @@ class JmsConnectorsSpec extends JmsSpec {
}

"publish and subscribe with a durable subscription" in withServer() { server =>
import org.apache.activemq.ActiveMQConnectionFactory
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
val producerConnectionFactory = server.createConnectionFactory
//#create-connection-factory-with-client-id
val consumerConnectionFactory = server.createConnectionFactory.asInstanceOf[ActiveMQConnectionFactory]
Expand Down
Loading

0 comments on commit 139f1f2

Please sign in to comment.