From 4a441d0243b094eedaa63d465a657d5f192589c8 Mon Sep 17 00:00:00 2001 From: Enno <458526+ennru@users.noreply.github.com> Date: Wed, 7 Aug 2019 11:41:34 +0200 Subject: [PATCH] Docs: move JMS examples to alpakka-samples --- build.sbt | 1 - .../main/java/jms/javasamples/JmsToFile.java | 83 ---------- .../java/jms/javasamples/JmsToHttpGet.java | 96 ------------ .../javasamples/JmsToOneFilePerMessage.java | 94 ------------ .../java/jms/javasamples/JmsToWebSocket.java | 142 ------------------ .../src/main/scala/jms/JmsSampleBase.scala | 25 --- .../src/main/scala/jms/JmsToFile.scala | 54 ------- .../src/main/scala/jms/JmsToHttpGet.scala | 59 -------- .../scala/jms/JmsToOneFilePerMessage.scala | 53 ------- .../src/main/scala/jms/JmsToWebSocket.scala | 91 ----------- .../scala/playground/ActiveMqBroker.scala | 49 ------ .../src/main/scala/playground/WebServer.scala | 108 ------------- docs/src/main/paradox/examples/jms-samples.md | 68 +-------- project/Dependencies.scala | 4 - 14 files changed, 1 insertion(+), 926 deletions(-) delete mode 100644 doc-examples/src/main/java/jms/javasamples/JmsToFile.java delete mode 100644 doc-examples/src/main/java/jms/javasamples/JmsToHttpGet.java delete mode 100644 doc-examples/src/main/java/jms/javasamples/JmsToOneFilePerMessage.java delete mode 100644 doc-examples/src/main/java/jms/javasamples/JmsToWebSocket.java delete mode 100644 doc-examples/src/main/scala/jms/JmsSampleBase.scala delete mode 100644 doc-examples/src/main/scala/jms/JmsToFile.scala delete mode 100644 doc-examples/src/main/scala/jms/JmsToHttpGet.scala delete mode 100644 doc-examples/src/main/scala/jms/JmsToOneFilePerMessage.scala delete mode 100644 doc-examples/src/main/scala/jms/JmsToWebSocket.scala delete mode 100644 doc-examples/src/main/scala/playground/ActiveMqBroker.scala delete mode 100644 doc-examples/src/main/scala/playground/WebServer.scala diff --git a/build.sbt b/build.sbt index 990d37e277..459e24ad10 100644 --- a/build.sbt +++ b/build.sbt @@ -352,7 +352,6 @@ lazy val `doc-examples` = project .dependsOn( files, ftp, - jms, mqtt ) .settings( diff --git a/doc-examples/src/main/java/jms/javasamples/JmsToFile.java b/doc-examples/src/main/java/jms/javasamples/JmsToFile.java deleted file mode 100644 index bc4a26c492..0000000000 --- a/doc-examples/src/main/java/jms/javasamples/JmsToFile.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Copyright (C) 2016-2019 Lightbend Inc. - */ - -package jms.javasamples; - -// #sample -import akka.actor.ActorSystem; -import akka.japi.Pair; -import akka.stream.ActorMaterializer; -import akka.stream.IOResult; -import akka.stream.Materializer; -import akka.stream.alpakka.jms.JmsConsumerSettings; -import akka.stream.alpakka.jms.JmsProducerSettings; -import akka.stream.alpakka.jms.javadsl.JmsConsumer; -import akka.stream.alpakka.jms.javadsl.JmsConsumerControl; -import akka.stream.alpakka.jms.javadsl.JmsProducer; -import akka.stream.javadsl.FileIO; -import akka.stream.javadsl.Keep; -import akka.stream.javadsl.Sink; -import akka.stream.javadsl.Source; -import akka.util.ByteString; - -import java.nio.file.Paths; -import java.util.concurrent.CompletionStage; - -// #sample - -import playground.ActiveMqBroker; -import scala.concurrent.ExecutionContext; - -import javax.jms.ConnectionFactory; -import java.util.Arrays; - -public class JmsToFile { - - public static void main(String[] args) throws Exception { - JmsToFile me = new JmsToFile(); - me.run(); - } - - private final ActorSystem system = ActorSystem.create(); - private final Materializer materializer = ActorMaterializer.create(system); - private final ExecutionContext ec = system.dispatcher(); - - private void enqueue(ConnectionFactory connectionFactory, String... msgs) { - Sink jmsSink = - JmsProducer.textSink( - JmsProducerSettings.create(system, connectionFactory).withQueue("test")); - Source.from(Arrays.asList(msgs)).runWith(jmsSink, materializer); - } - - private void run() throws Exception { - ActiveMqBroker activeMqBroker = new ActiveMqBroker(); - activeMqBroker.start(); - - ConnectionFactory connectionFactory = activeMqBroker.createConnectionFactory(); - enqueue(connectionFactory, "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k"); - // #sample - - Source jmsSource = // (1) - JmsConsumer.textSource( - JmsConsumerSettings.create(system, connectionFactory).withQueue("test")); - - Sink> fileSink = - FileIO.toPath(Paths.get("target/out.txt")); // (2) - - Pair> pair = - jmsSource // : String - .map(ByteString::fromString) // : ByteString (3) - .toMat(fileSink, Keep.both()) - .run(materializer); - - // #sample - - JmsConsumerControl runningSource = pair.first(); - CompletionStage streamCompletion = pair.second(); - - runningSource.shutdown(); - streamCompletion.thenAccept(res -> system.terminate()); - system.getWhenTerminated().thenAccept(t -> activeMqBroker.stop(ec)); - } -} diff --git a/doc-examples/src/main/java/jms/javasamples/JmsToHttpGet.java b/doc-examples/src/main/java/jms/javasamples/JmsToHttpGet.java deleted file mode 100644 index 7a7b6d064a..0000000000 --- a/doc-examples/src/main/java/jms/javasamples/JmsToHttpGet.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Copyright (C) 2016-2019 Lightbend Inc. - */ - -package jms.javasamples; - -// #sample - -import akka.Done; -import akka.actor.ActorSystem; -import akka.http.javadsl.Http; -import akka.http.javadsl.model.HttpRequest; -import akka.japi.Pair; -import akka.stream.ActorMaterializer; -import akka.stream.Materializer; -import akka.stream.alpakka.jms.JmsConsumerSettings; -import akka.stream.alpakka.jms.JmsProducerSettings; -import akka.stream.alpakka.jms.javadsl.JmsConsumer; -import akka.stream.alpakka.jms.javadsl.JmsConsumerControl; -import akka.stream.alpakka.jms.javadsl.JmsProducer; -import akka.stream.javadsl.Keep; -import akka.stream.javadsl.Sink; -import akka.stream.javadsl.Source; -import akka.util.ByteString; -import playground.ActiveMqBroker; -import playground.WebServer; -import scala.concurrent.ExecutionContext; - -import javax.jms.ConnectionFactory; -import java.util.Arrays; -import java.util.concurrent.CompletionStage; - -// #sample - -public class JmsToHttpGet { - - public static void main(String[] args) throws Exception { - JmsToHttpGet me = new JmsToHttpGet(); - me.run(); - } - - private final ActorSystem system = ActorSystem.create(); - private final Materializer materializer = ActorMaterializer.create(system); - private final ExecutionContext ec = system.dispatcher(); - - private void enqueue(ConnectionFactory connectionFactory, String... msgs) { - Sink jmsSink = - JmsProducer.textSink( - JmsProducerSettings.create(system, connectionFactory).withQueue("test")); - Source.from(Arrays.asList(msgs)).runWith(jmsSink, materializer); - } - - private void run() throws Exception { - ActiveMqBroker activeMqBroker = new ActiveMqBroker(); - activeMqBroker.start(); - - WebServer webserver = new WebServer(); - webserver.start("localhost", 8080); - - ConnectionFactory connectionFactory = activeMqBroker.createConnectionFactory(); - enqueue(connectionFactory, "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k"); - // #sample - - final Http http = Http.get(system); - - Source jmsSource = // (1) - JmsConsumer.textSource( - JmsConsumerSettings.create(system, connectionFactory).withQueue("test")); - - int parallelism = 4; - Pair> pair = - jmsSource // : String - .map(ByteString::fromString) // : ByteString (2) - .map( - bs -> - HttpRequest.create("http://localhost:8080/hello") - .withEntity(bs)) // : HttpRequest (3) - .mapAsyncUnordered(parallelism, http::singleRequest) // : HttpResponse (4) - .toMat(Sink.foreach(System.out::println), Keep.both()) // (5) - .run(materializer); - // #sample - Thread.sleep(5 * 1000); - JmsConsumerControl runningSource = pair.first(); - CompletionStage streamCompletion = pair.second(); - - runningSource.shutdown(); - streamCompletion.thenAccept(res -> system.terminate()); - system - .getWhenTerminated() - .thenAccept( - t -> { - webserver.stop(); - activeMqBroker.stop(ec); - }); - } -} diff --git a/doc-examples/src/main/java/jms/javasamples/JmsToOneFilePerMessage.java b/doc-examples/src/main/java/jms/javasamples/JmsToOneFilePerMessage.java deleted file mode 100644 index e2a74e3929..0000000000 --- a/doc-examples/src/main/java/jms/javasamples/JmsToOneFilePerMessage.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Copyright (C) 2016-2019 Lightbend Inc. - */ - -package jms.javasamples; - -// #sample -import akka.Done; -import akka.actor.ActorSystem; -import akka.japi.Pair; -import akka.stream.ActorMaterializer; -import akka.stream.KillSwitch; -import akka.stream.Materializer; -import akka.stream.alpakka.jms.JmsConsumerSettings; -import akka.stream.alpakka.jms.JmsProducerSettings; -import akka.stream.alpakka.jms.javadsl.JmsConsumer; -import akka.stream.alpakka.jms.javadsl.JmsConsumerControl; -import akka.stream.alpakka.jms.javadsl.JmsProducer; -import akka.stream.javadsl.FileIO; -import akka.stream.javadsl.Keep; -import akka.stream.javadsl.Sink; -import akka.stream.javadsl.Source; -import akka.util.ByteString; - -import java.nio.file.Paths; -import java.util.Arrays; -import java.util.concurrent.CompletionStage; - -// #sample -import playground.ActiveMqBroker; -import scala.concurrent.ExecutionContext; - -import javax.jms.ConnectionFactory; - -public class JmsToOneFilePerMessage { - - public static void main(String[] args) throws Exception { - JmsToOneFilePerMessage me = new JmsToOneFilePerMessage(); - me.run(); - } - - private final ActorSystem system = ActorSystem.create(); - private final Materializer materializer = ActorMaterializer.create(system); - private final ExecutionContext ec = system.dispatcher(); - - private void enqueue(ConnectionFactory connectionFactory, String... msgs) { - Sink jmsSink = - JmsProducer.textSink( - JmsProducerSettings.create(system, connectionFactory).withQueue("test")); - Source.from(Arrays.asList(msgs)).runWith(jmsSink, materializer); - } - - private void run() throws Exception { - ActiveMqBroker activeMqBroker = new ActiveMqBroker(); - activeMqBroker.start(); - - ConnectionFactory connectionFactory = activeMqBroker.createConnectionFactory(); - enqueue(connectionFactory, "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k"); - // #sample - - Source jmsConsumer = // (1) - JmsConsumer.textSource( - JmsConsumerSettings.create(system, connectionFactory).withQueue("test")); - - int parallelism = 5; - Pair> pair = - jmsConsumer // : String - .map(ByteString::fromString) // : ByteString (2) - .zipWithIndex() // : Pair (3) - .mapAsyncUnordered( - parallelism, - (in) -> { - ByteString byteString = in.first(); - Long number = in.second(); - return Source // (4) - .single(byteString) - .runWith( - FileIO.toPath(Paths.get("target/out-" + number + ".txt")), materializer); - }) // : IoResult - .toMat(Sink.ignore(), Keep.both()) - .run(materializer); - - // #sample - - KillSwitch runningSource = pair.first(); - CompletionStage streamCompletion = pair.second(); - - Thread.sleep(2 * 1000); - - runningSource.shutdown(); - streamCompletion.thenAccept(res -> system.terminate()); - system.getWhenTerminated().thenAccept(t -> activeMqBroker.stop(ec)); - } -} diff --git a/doc-examples/src/main/java/jms/javasamples/JmsToWebSocket.java b/doc-examples/src/main/java/jms/javasamples/JmsToWebSocket.java deleted file mode 100644 index a4e72aedc6..0000000000 --- a/doc-examples/src/main/java/jms/javasamples/JmsToWebSocket.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Copyright (C) 2016-2019 Lightbend Inc. - */ - -package jms.javasamples; - -// #sample - -import akka.Done; -import akka.actor.ActorSystem; -import akka.http.javadsl.Http; -import akka.http.javadsl.model.StatusCodes; -import akka.http.javadsl.model.ws.Message; -import akka.http.javadsl.model.ws.TextMessage; -import akka.http.javadsl.model.ws.WebSocketRequest; -import akka.http.javadsl.model.ws.WebSocketUpgradeResponse; -import akka.japi.Pair; -import akka.stream.ActorMaterializer; -import akka.stream.Materializer; -import akka.stream.alpakka.jms.JmsConsumerSettings; -import akka.stream.alpakka.jms.JmsProducerSettings; -import akka.stream.alpakka.jms.javadsl.JmsConsumer; -import akka.stream.alpakka.jms.javadsl.JmsConsumerControl; -import akka.stream.alpakka.jms.javadsl.JmsProducer; -import akka.stream.javadsl.Flow; -import akka.stream.javadsl.Keep; -import akka.stream.javadsl.Sink; -import akka.stream.javadsl.Source; -import playground.ActiveMqBroker; -import playground.WebServer; -import scala.concurrent.ExecutionContext; - -import javax.jms.ConnectionFactory; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; - -// #sample - -public class JmsToWebSocket { - - public static void main(String[] args) throws Exception { - JmsToWebSocket me = new JmsToWebSocket(); - me.run(); - } - - private final ActorSystem system = ActorSystem.create(); - private final Materializer materializer = ActorMaterializer.create(system); - private final ExecutionContext ec = system.dispatcher(); - - private void enqueue(ConnectionFactory connectionFactory, String... msgs) { - Sink jmsSink = - JmsProducer.textSink( - JmsProducerSettings.create(system, connectionFactory).withQueue("test")); - Source.from(Arrays.asList(msgs)).runWith(jmsSink, materializer); - } - - private void run() throws Exception { - ActiveMqBroker activeMqBroker = new ActiveMqBroker(); - activeMqBroker.start(); - - WebServer webserver = new WebServer(); - webserver.start("localhost", 8080); - - ConnectionFactory connectionFactory = activeMqBroker.createConnectionFactory(); - enqueue(connectionFactory, "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k"); - // #sample - - final Http http = Http.get(system); - - Source jmsSource = // (1) - JmsConsumer.textSource( - JmsConsumerSettings.create(system, connectionFactory) - .withBufferSize(10) - .withQueue("test")); - - Flow> webSocketFlow = // (2) - http.webSocketClientFlow(WebSocketRequest.create("ws://localhost:8080/webSocket/ping")); - - int parallelism = 4; - Pair>, CompletionStage> - pair = - jmsSource // : String - .map( - s -> { - Message msg = TextMessage.create(s); - return msg; - }) // : Message (3) - .viaMat(webSocketFlow, Keep.both()) // : Message (4) - .mapAsync(parallelism, this::wsMessageToString) // : String (5) - .map(s -> "client received: " + s) // : String (6) - .toMat(Sink.foreach(System.out::println), Keep.both()) // (7) - .run(materializer); - // #sample - JmsConsumerControl runningSource = pair.first().first(); - CompletionStage wsUpgradeResponse = pair.first().second(); - CompletionStage streamCompletion = pair.second(); - - wsUpgradeResponse - .thenApply( - upgrade -> { - if (upgrade.response().status() == StatusCodes.SWITCHING_PROTOCOLS) { - return "WebSocket established"; - } else { - throw new RuntimeException("Connection failed: " + upgrade.response().status()); - } - }) - .thenAccept(System.out::println); - - Thread.sleep(2 * 1000); - runningSource.shutdown(); - streamCompletion.thenAccept(res -> system.terminate()); - system - .getWhenTerminated() - .thenAccept( - t -> { - webserver.stop(); - activeMqBroker.stop(ec); - }); - } - - // #sample - - /** Convert potentially chunked WebSocket Message to a string. */ - private CompletionStage wsMessageToString(Message msg) { - if (msg.isText()) { - TextMessage tMsg = msg.asTextMessage(); - if (tMsg.isStrict()) { - return CompletableFuture.completedFuture(tMsg.getStrictText()); - } else { - CompletionStage> strings = - tMsg.getStreamedText().runWith(Sink.seq(), materializer); - return strings.thenApply(list -> String.join("", list)); - } - } else { - return CompletableFuture.completedFuture(msg.toString()); - } - } - // #sample - -} diff --git a/doc-examples/src/main/scala/jms/JmsSampleBase.scala b/doc-examples/src/main/scala/jms/JmsSampleBase.scala deleted file mode 100644 index a9239efebe..0000000000 --- a/doc-examples/src/main/scala/jms/JmsSampleBase.scala +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright (C) 2016-2019 Lightbend Inc. - */ - -package jms - -import akka.Done -import akka.stream.alpakka.jms.JmsProducerSettings -import akka.stream.alpakka.jms.scaladsl.JmsProducer -import akka.stream.scaladsl.{Sink, Source} -import javax.jms.ConnectionFactory -import playground.ActorSystemAvailable - -import scala.concurrent.Future - -class JmsSampleBase extends ActorSystemAvailable { - - def enqueue(connectionFactory: ConnectionFactory)(msgs: String*): Unit = { - val jmsSink: Sink[String, Future[Done]] = - JmsProducer.textSink( - JmsProducerSettings(actorSystem, connectionFactory).withQueue("test") - ) - Source(msgs.toList).runWith(jmsSink) - } -} diff --git a/doc-examples/src/main/scala/jms/JmsToFile.scala b/doc-examples/src/main/scala/jms/JmsToFile.scala deleted file mode 100644 index 6e07b570c1..0000000000 --- a/doc-examples/src/main/scala/jms/JmsToFile.scala +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright (C) 2016-2019 Lightbend Inc. - */ - -package jms - -// #sample -import java.nio.file.Paths - -import akka.stream.IOResult -import akka.stream.alpakka.jms.JmsConsumerSettings -import akka.stream.alpakka.jms.scaladsl.{JmsConsumer, JmsConsumerControl} -import akka.stream.scaladsl.{FileIO, Keep, Sink, Source} -import akka.util.ByteString - -import scala.concurrent.Future -import scala.concurrent.duration.DurationInt -// #sample -import playground.ActiveMqBroker - -object JmsToFile extends JmsSampleBase with App { - - ActiveMqBroker.start() - - val connectionFactory = ActiveMqBroker.createConnectionFactory - enqueue(connectionFactory)("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k") - - // format: off - // #sample - - val jmsSource: Source[String, JmsConsumerControl] = // (1) - JmsConsumer.textSource( - JmsConsumerSettings(actorSystem, connectionFactory).withBufferSize(10).withQueue("test") - ) - - val fileSink: Sink[ByteString, Future[IOResult]] = // (2) - FileIO.toPath(Paths.get("target/out.txt")) - - val (runningSource, finished): (JmsConsumerControl, Future[IOResult]) = - // stream element type - jmsSource //: String - .map(ByteString(_)) //: ByteString (3) - .toMat(fileSink)(Keep.both) - .run() - // #sample - // format: on - wait(1.second) - runningSource.shutdown() - for { - _ <- actorSystem.terminate() - _ <- ActiveMqBroker.stop() - } () - -} diff --git a/doc-examples/src/main/scala/jms/JmsToHttpGet.scala b/doc-examples/src/main/scala/jms/JmsToHttpGet.scala deleted file mode 100644 index 99bb295fdd..0000000000 --- a/doc-examples/src/main/scala/jms/JmsToHttpGet.scala +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright (C) 2016-2019 Lightbend Inc. - */ - -package jms - -// #sample -import akka.Done -import akka.http.scaladsl.Http -import akka.http.scaladsl.model._ -import akka.stream.alpakka.jms.JmsConsumerSettings -import akka.stream.alpakka.jms.scaladsl.{JmsConsumer, JmsConsumerControl} -import akka.stream.scaladsl.{Keep, Sink, Source} -import akka.util.ByteString - -import scala.concurrent.Future -import scala.concurrent.duration.DurationInt - -// #sample -import playground.{ActiveMqBroker, WebServer} - -object JmsToHttpGet extends JmsSampleBase with App { - - WebServer.start() - ActiveMqBroker.start() - - val connectionFactory = ActiveMqBroker.createConnectionFactory - enqueue(connectionFactory)("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k") - - // format: off - // #sample - val jmsSource: Source[String, JmsConsumerControl] = // (1) - JmsConsumer.textSource( - JmsConsumerSettings(actorSystem,connectionFactory).withBufferSize(10).withQueue("test") - ) - - val (runningSource, finished): (JmsConsumerControl, Future[Done]) = - jmsSource //: String - .map(ByteString(_)) //: ByteString (2) - .map { bs => - HttpRequest(uri = Uri("http://localhost:8080/hello"), //: HttpRequest (3) - entity = HttpEntity(bs)) - } - .mapAsyncUnordered(4)(Http().singleRequest(_)) //: HttpResponse (4) - .toMat(Sink.foreach(println))(Keep.both) // (5) - .run() - // #sample - // format: on - finished.foreach(_ => println("stream finished")) - - wait(5.seconds) - runningSource.shutdown() - for { - _ <- actorSystem.terminate() - _ <- WebServer.stop() - _ <- ActiveMqBroker.stop() - } () - -} diff --git a/doc-examples/src/main/scala/jms/JmsToOneFilePerMessage.scala b/doc-examples/src/main/scala/jms/JmsToOneFilePerMessage.scala deleted file mode 100644 index 07b5aeae11..0000000000 --- a/doc-examples/src/main/scala/jms/JmsToOneFilePerMessage.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright (C) 2016-2019 Lightbend Inc. - */ - -package jms - -// #sample -import java.nio.file.Paths - -import akka.stream.alpakka.jms.JmsConsumerSettings -import akka.stream.alpakka.jms.scaladsl.{JmsConsumer, JmsConsumerControl} -import akka.stream.scaladsl.{FileIO, Keep, Sink, Source} -import akka.util.ByteString - -import scala.concurrent.duration.DurationInt -// #sample -import playground.ActiveMqBroker - -object JmsToOneFilePerMessage extends JmsSampleBase with App { - - ActiveMqBroker.start() - - val connectionFactory = ActiveMqBroker.createConnectionFactory - enqueue(connectionFactory)("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k") - - // format: off - // #sample - - val jmsSource: Source[String, JmsConsumerControl] = // (1) - JmsConsumer.textSource( - JmsConsumerSettings(actorSystem, connectionFactory).withBufferSize(10).withQueue("test") - ) - // stream element type - val runningSource = jmsSource //: String - .map(ByteString(_)) //: ByteString (2) - .zipWithIndex //: (ByteString, Long) (3) - .mapAsyncUnordered(parallelism = 5) { case (byteStr, number) => - Source // (4) - .single(byteStr) - .runWith(FileIO.toPath(Paths.get(s"target/out-$number.txt"))) - } //: IoResult - .toMat(Sink.ignore)(Keep.left) - .run() - // #sample - // format: on - wait(1.second) - runningSource.shutdown() - for { - _ <- actorSystem.terminate() - _ <- ActiveMqBroker.stop() - } () - -} diff --git a/doc-examples/src/main/scala/jms/JmsToWebSocket.scala b/doc-examples/src/main/scala/jms/JmsToWebSocket.scala deleted file mode 100644 index cfd0b841f3..0000000000 --- a/doc-examples/src/main/scala/jms/JmsToWebSocket.scala +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Copyright (C) 2016-2019 Lightbend Inc. - */ - -package jms - -// #sample -import akka.Done -import akka.http.scaladsl.Http -import akka.http.scaladsl.model._ -import akka.http.scaladsl.model.ws.{WebSocketRequest, WebSocketUpgradeResponse} -import akka.stream.alpakka.jms.JmsConsumerSettings -import akka.stream.alpakka.jms.scaladsl.{JmsConsumer, JmsConsumerControl} -import akka.stream.scaladsl.{Flow, Keep, Sink, Source} - -import scala.concurrent.Future -// #sample -import playground.{ActiveMqBroker, WebServer} - -import scala.concurrent.duration.DurationInt - -object JmsToWebSocket extends JmsSampleBase with App { - - ActiveMqBroker.start() - WebServer.start() - - val connectionFactory = ActiveMqBroker.createConnectionFactory - enqueue(connectionFactory)("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k") - - // format: off - // #sample - - val jmsSource: Source[String, JmsConsumerControl] = - JmsConsumer.textSource( // (1) - JmsConsumerSettings(actorSystem, connectionFactory).withBufferSize(10).withQueue("test") - ) - - val webSocketFlow: Flow[ws.Message, ws.Message, Future[WebSocketUpgradeResponse]] = // (2) - Http().webSocketClientFlow(WebSocketRequest("ws://localhost:8080/webSocket/ping")) - - val ((runningSource, wsUpgradeResponse), streamCompletion): ((JmsConsumerControl, Future[WebSocketUpgradeResponse]), Future[Done]) = - // stream element type - jmsSource //: String - .map(ws.TextMessage(_)) //: ws.TextMessage (3) - .viaMat(webSocketFlow)(Keep.both) //: ws.TextMessage (4) - .mapAsync(1)(wsMessageToString) //: String (5) - .map("client received: " + _) //: String (6) - .toMat(Sink.foreach(println))(Keep.both) // (7) - .run() - // #sample - // format: on - - wsUpgradeResponse - .map { upgrade => - if (upgrade.response.status == StatusCodes.SwitchingProtocols) { - "WebSocket established" - } else { - throw new RuntimeException(s"Connection failed: ${upgrade.response.status}") - } - } - .onComplete(println) - - wait(5.seconds) - runningSource.shutdown() - - for { - _ <- streamCompletion - _ <- actorSystem.terminate() - _ <- WebServer.stop() - _ <- ActiveMqBroker.stop() - } () - - // #sample - - /** - * Convert potentially chunked WebSocket Message to a string. - */ - def wsMessageToString: ws.Message => Future[String] = { - case message: ws.TextMessage.Strict => - Future.successful(message.text) - - case message: ws.TextMessage.Streamed => - val seq = message.textStream.runWith(Sink.seq) - seq.map(seq => seq.mkString) - - case message => - Future.successful(message.toString) - } - // #sample - -} diff --git a/doc-examples/src/main/scala/playground/ActiveMqBroker.scala b/doc-examples/src/main/scala/playground/ActiveMqBroker.scala deleted file mode 100644 index 10d01851d7..0000000000 --- a/doc-examples/src/main/scala/playground/ActiveMqBroker.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright (C) 2016-2019 Lightbend Inc. - */ - -package playground - -import javax.jms.ConnectionFactory - -import akka.Done -import org.apache.activemq.ActiveMQConnectionFactory -import org.apache.activemq.broker.BrokerService - -import scala.concurrent.{ExecutionContext, Future} - -/** - * To start an ActiveMQ broker be sure to include these dependencies: - * - * "javax.jms" % "jms" % "1.1", - * "org.apache.activemq" % "activemq-all" % "5.14.4" - */ -class ActiveMqBroker { - - var brokerService: Option[BrokerService] = None - - def start(): BrokerService = { - val broker = new BrokerService() - broker.setBrokerName("localhost") - broker.setUseJmx(false) - broker.start() - brokerService = Some(broker) - broker - } - - def stop()(implicit ec: ExecutionContext): Future[Done] = - brokerService.fold(Future.successful(Done)) { broker => - Future { - broker.stop() - scala.concurrent.blocking { - broker.waitUntilStopped() - } - Done - } - } - - def createConnectionFactory: ConnectionFactory = new ActiveMQConnectionFactory("vm://localhost?create=false") - -} - -object ActiveMqBroker extends ActiveMqBroker diff --git a/doc-examples/src/main/scala/playground/WebServer.scala b/doc-examples/src/main/scala/playground/WebServer.scala deleted file mode 100644 index 9b0376174e..0000000000 --- a/doc-examples/src/main/scala/playground/WebServer.scala +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Copyright (C) 2016-2019 Lightbend Inc. - */ - -package playground - -import akka.actor.{ActorSystem, Terminated} -import akka.event.Logging -import akka.http.scaladsl.model.ws.{BinaryMessage, Message, TextMessage} -import akka.http.scaladsl.model.{ContentTypes, HttpEntity} -import akka.http.scaladsl.server.{HttpApp, Route} -import akka.http.scaladsl.settings.ServerSettings -import akka.stream.scaladsl.{Flow, GraphDSL, Sink, Source} -import akka.stream.{ActorMaterializer, FlowShape} -import akka.{Done, NotUsed} - -import scala.concurrent.duration.DurationInt -import scala.concurrent.{ExecutionContext, Future, Promise} -import scala.util.Success - -class WebServer extends HttpApp { - implicit val theSystem = ActorSystem(Logging.simpleName(this).replaceAll("\\$", "")) - implicit val materializer = ActorMaterializer() - implicit val executionContext = theSystem.dispatcher - - private val shutdownPromise = Promise[Done] - - /** Override to do something more interesting on Web socket messages - * http://doc.akka.io/docs/akka-http/current/scala/http/websocket-support.html#routing-support - */ - def websocket: Flow[Message, Message, Any] = - Flow[Message].mapConcat { - case tm: TextMessage => - println(s"Web server received web socket message: $tm") - TextMessage( - Source - .single("Hello ") - .concat(tm.textStream) - .concat(Source.single("!")) - ) :: Nil - case bm: BinaryMessage => - // ignore binary messages but drain content to avoid the stream being clogged - bm.dataStream.runWith(Sink.ignore) - Nil - } - - /** - * Sends out messages on the websocket. - */ - def outgoing: Flow[Message, Message, NotUsed] = { - val routingGraph: Flow[Message, Message, NotUsed] = Flow.fromGraph(GraphDSL.create() { implicit b => - val in = b.add(Sink.ignore) - val out = b.add(Source.tick(2.seconds, 10.seconds, TextMessage("Tick"))) - FlowShape(in.in, out.out) - }) - Flow[Message].via(routingGraph) - } - - /** - * @see http://doc.akka.io/docs/akka-http/current/scala/http/routing-dsl/overview.html - * http://doc.akka.io/docs/akka-http/current/scala/http/routing-dsl/directives/alphabetically.html - */ - override def routes: Route = - pathSingleSlash { - complete { - println("Web server received GET /") - HttpEntity(ContentTypes.`text/html(UTF-8)`, "Welcome to the playground!") - } - } ~ - path("hello") { - get { ctx => - ctx.complete { - println(s"Web server received ${ctx.request}") - HttpEntity(ContentTypes.`application/json`, """{ msg: "Hi!" }""") - } - } - } ~ - pathPrefix("webSocket") { - path("ping") { - // connect e.g. with Http().webSocketClientFlow(WebSocketRequest("ws://localhost:8080/webSocket/ping")) - println("Web server received webSocket/ping connect") - handleWebSocketMessages(websocket) - } // ~ -// path("outgoing") { -// handleWebSocketMessages(outgoing) -// } - } - - override protected def postHttpBindingFailure(cause: Throwable): Unit = - println(s"The server could not be started due to $cause") - - def start(host: String = "localhost", port: Int = 8080): Future[Done] = { - val settings = ServerSettings(theSystem.settings.config) - Future { - startServer(host, port, settings, theSystem) - }.map(_ => Done) - } - - override protected def waitForShutdownSignal(system: ActorSystem)(implicit ec: ExecutionContext): Future[Done] = - shutdownPromise.future - - def stop(): Future[Terminated] = { - shutdownPromise.tryComplete(Success(Done)) - theSystem.terminate() - } -} - -object WebServer extends WebServer diff --git a/docs/src/main/paradox/examples/jms-samples.md b/docs/src/main/paradox/examples/jms-samples.md index 67e68e24f4..ae35298116 100644 --- a/docs/src/main/paradox/examples/jms-samples.md +++ b/docs/src/main/paradox/examples/jms-samples.md @@ -1,69 +1,3 @@ # JMS -### Example: Read text messages from JMS queue and append to file - -- listens to the JMS queue "test" receiving `String`s (1), -- converts incoming data to `akka.util.ByteString` (3), -- and appends the data to the file `target/out` (2). - -Scala -: @@snip [snip](/doc-examples/src/main/scala/jms/JmsToFile.scala) { #sample } - -Java -: @@snip [snip](/doc-examples/src/main/java/jms/javasamples/JmsToFile.java) { #sample } - -### Example: Read text messages from JMS queue and create one file per message - -- listens to the JMS queue "test" receiving `String`s (1), -- converts incoming data to `akka.util.ByteString` (2), -- combines the incoming data with a counter (3), -- creates an intermediary stream writing the incoming data to a file using the counter -value to create unique file names (4). - -Scala -: @@snip [snip](/doc-examples/src/main/scala/jms/JmsToOneFilePerMessage.scala) { #sample } - -Java -: @@snip [snip](/doc-examples/src/main/java/jms/javasamples/JmsToOneFilePerMessage.java) { #sample } - -### Example: Read text messages from JMS queue and send to web server - -- listens to the JMS queue "test" receiving `String`s (1), -- converts incoming data to `akka.util.ByteString` (2), -- puts the received text into an `HttpRequest` (3), -- sends the created request via Akka Http (4), -- prints the `HttpResponse` to standard out (5). - -Scala -: @@snip [snip](/doc-examples/src/main/scala/jms/JmsToHttpGet.scala) { #sample } - -Java -: @@snip [snip](/doc-examples/src/main/java/jms/javasamples/JmsToHttpGet.java) { #sample } - -### Example: Read text messages from JMS queue and send to web socket - -- listens to the JMS queue "test" receiving `String`s (1), -- configures a web socket flow to localhost (2), -- converts incoming data to a @scala[@scaladoc[ws.TextMessage](akka.http.scaladsl.model.ws.TextMessage)]@java[@scaladoc[akka.http.javadsl.model.ws.TextMessage](akka.http.javadsl.model.ws.TextMessage)] (3), -- pass the message via the web socket flow (4), -- convert the (potentially chunked) web socket reply to a `String` (5), -- prefix the `String` (6), -- end the stream by writing the values to standard out (7). - -Scala -: @@snip [snip](/doc-examples/src/main/scala/jms/JmsToWebSocket.scala) { #sample } - -Java -: @@snip [snip](/doc-examples/src/main/java/jms/javasamples/JmsToWebSocket.java) { #sample } - -### Running the example code - -This example is contained in a stand-alone runnable main, it can be run - from `sbt` like this: - - -Scala -: ``` - sbt - > doc-examples/run - ``` +JMS examples are shown in the [Alpakka Samples](https://akka.io/alpakka-samples/jms/) project. diff --git a/project/Dependencies.scala b/project/Dependencies.scala index ac25069720..8ffd0571ac 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -103,10 +103,6 @@ object Dependencies { "com.typesafe.akka" %% "akka-http-spray-json" % AkkaHttpVersion, // https://github.com/akka/alpakka-kafka/releases "com.typesafe.akka" %% "akka-stream-kafka" % "1.0.5", - // https://github.com/javaee/javax.jms - "javax.jms" % "jms" % "1.1", // CDDL Version 1.1 - // http://activemq.apache.org/download.html - "org.apache.activemq" % "activemq-all" % "5.15.4" exclude ("log4j", "log4j") exclude ("org.slf4j", "slf4j-log4j12"), // ApacheV2 "io.netty" % "netty-all" % "4.1.29.Final", // ApacheV2 "com.fasterxml.jackson.datatype" % "jackson-datatype-jdk8" % "2.9.9", "com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % "2.9.9",