From 68351f4c8d563f6f5e78c7af6e809cf48716483b Mon Sep 17 00:00:00 2001 From: Enno <458526+ennru@users.noreply.github.com> Date: Fri, 5 Jul 2019 16:31:25 +0200 Subject: [PATCH] Move out Elasticsearch example to Alpakka-samples --- build.sbt | 95 +++++++------- ...hUsingSlickAndStreamIntoElasticInJava.java | 123 ------------------ .../FetchUsingSlickAndStreamIntoElastic.scala | 78 ----------- .../playground/ElasticSearchEmbedded.scala | 24 ---- .../elastic/ElasticsearchMock.scala | 53 -------- .../paradox/examples/elasticsearch-samples.md | 31 +---- project/Dependencies.scala | 9 +- 7 files changed, 50 insertions(+), 363 deletions(-) delete mode 100644 doc-examples/src/main/java/elastic/FetchUsingSlickAndStreamIntoElasticInJava.java delete mode 100644 doc-examples/src/main/scala/elastic/FetchUsingSlickAndStreamIntoElastic.scala delete mode 100644 doc-examples/src/main/scala/playground/ElasticSearchEmbedded.scala delete mode 100644 doc-examples/src/main/scala/playground/elastic/ElasticsearchMock.scala diff --git a/build.sbt b/build.sbt index a9f1954dae..19f7a1ff99 100644 --- a/build.sbt +++ b/build.sbt @@ -1,54 +1,52 @@ import Whitesource.whitesourceGroup -lazy val modules: Seq[ProjectReference] = Seq( - amqp, - avroparquet, - awslambda, - azureStorageQueue, - cassandra, - couchbase, - csv, - dynamodb, - elasticsearch, - files, - ftp, - geode, - googleCloudPubSub, - googleCloudPubSubGrpc, - googleCloudStorage, - googleFcm, - hbase, - hdfs, - influxdb, - ironmq, - jms, - jsonStreaming, - kinesis, - kudu, - mongodb, - mqtt, - mqttStreaming, - orientdb, - reference, - s3, - springWeb, - simpleCodecs, - slick, - sns, - solr, - sqs, - sse, - text, - udp, - unixdomainsocket, - xml -) - lazy val alpakka = project .in(file(".")) .enablePlugins(ScalaUnidocPlugin) .disablePlugins(MimaPlugin, SitePlugin) - .aggregate(modules: _*) + .aggregate( + amqp, + avroparquet, + awslambda, + azureStorageQueue, + cassandra, + couchbase, + csv, + dynamodb, + elasticsearch, + files, + ftp, + geode, + googleCloudPubSub, + googleCloudPubSubGrpc, + googleCloudStorage, + googleFcm, + hbase, + hdfs, + influxdb, + ironmq, + jms, + jsonStreaming, + kinesis, + kudu, + mongodb, + mqtt, + mqttStreaming, + orientdb, + reference, + s3, + springWeb, + simpleCodecs, + slick, + sns, + solr, + sqs, + sse, + text, + udp, + unixdomainsocket, + xml + ) .aggregate(`doc-examples`) .settings( onLoadMessage := @@ -103,7 +101,7 @@ lazy val couchbase = parallelExecution in Test := false, whitesourceGroup := Whitesource.Group.Supported) -lazy val csv = alpakkaProject("csv", "csv", Dependencies.Csv, whitesourceGroup := Whitesource.Group.Supported) +lazy val csv = alpakkaProject("csv", "csv", whitesourceGroup := Whitesource.Group.Supported) lazy val csvBench = alpakkaProject("csv-bench", "csvBench", publish / skip := true) .dependsOn(csv) @@ -359,7 +357,10 @@ lazy val `doc-examples` = project .enablePlugins(AutomateHeaderPlugin) .disablePlugins(BintrayPlugin, MimaPlugin, SitePlugin) .dependsOn( - modules.map(p => classpathDependency(p)): _* + files, + ftp, + jms, + mqtt ) .settings( name := s"akka-stream-alpakka-doc-examples", diff --git a/doc-examples/src/main/java/elastic/FetchUsingSlickAndStreamIntoElasticInJava.java b/doc-examples/src/main/java/elastic/FetchUsingSlickAndStreamIntoElasticInJava.java deleted file mode 100644 index 18865a5cbc..0000000000 --- a/doc-examples/src/main/java/elastic/FetchUsingSlickAndStreamIntoElasticInJava.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Copyright (C) 2016-2019 Lightbend Inc. - */ - -package elastic; - -// #sample -import akka.Done; -import akka.actor.ActorSystem; -import akka.stream.ActorMaterializer; -import akka.stream.Materializer; - -import akka.stream.alpakka.elasticsearch.ElasticsearchWriteSettings; -import akka.stream.alpakka.elasticsearch.WriteMessage; -import akka.stream.alpakka.elasticsearch.javadsl.ElasticsearchSink; -import org.apache.http.HttpHost; -import org.elasticsearch.client.RestClient; - -import akka.stream.alpakka.slick.javadsl.Slick; -import akka.stream.alpakka.slick.javadsl.SlickRow; -import akka.stream.alpakka.slick.javadsl.SlickSession; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.databind.ObjectMapper; - -import java.io.IOException; -import java.util.concurrent.CompletionStage; - -// #sample - -import org.codelibs.elasticsearch.runner.ElasticsearchClusterRunner; -import playground.ElasticSearchEmbedded; -import playground.elastic.ElasticsearchMock; - -public class FetchUsingSlickAndStreamIntoElasticInJava { - - public static void main(String[] args) { - FetchUsingSlickAndStreamIntoElasticInJava me = new FetchUsingSlickAndStreamIntoElasticInJava(); - me.run(); - } - - public - // #sample - - static class Movie { // (3) - public final int id; - public final String title; - public final String genre; - public final double gross; - - @JsonCreator - public Movie( - @JsonProperty("id") int id, - @JsonProperty("title") String title, - @JsonProperty("genre") String genre, - @JsonProperty("gross") double gross) { - this.id = id; - this.title = title; - this.genre = genre; - this.gross = gross; - } - } - - // #sample - - void run() { - ElasticsearchClusterRunner runner = ElasticSearchEmbedded.startElasticInstance(); - - // #sample - ActorSystem system = ActorSystem.create(); - Materializer materializer = ActorMaterializer.create(system); - - SlickSession session = SlickSession.forConfig("slick-h2-mem"); // (1) - system.registerOnTermination(session::close); - - // #sample - ElasticsearchMock.populateDataForTable(session, materializer); - - // #sample - RestClient elasticSearchClient = - RestClient.builder(new HttpHost("localhost", 9201)).build(); // (4) - - final ObjectMapper objectToJsonMapper = new ObjectMapper(); // (5) - - final CompletionStage done = - Slick.source( // (6) - session, - "SELECT * FROM MOVIE", - (SlickRow row) -> - new Movie(row.nextInt(), row.nextString(), row.nextString(), row.nextDouble())) - .map(movie -> WriteMessage.createIndexMessage(String.valueOf(movie.id), movie)) // (8) - .runWith( - ElasticsearchSink.create( // (9) - "movie", - "boxoffice", - ElasticsearchWriteSettings.Default(), - elasticSearchClient, - objectToJsonMapper), - materializer); - - done.thenRunAsync( - () -> { - try { - elasticSearchClient.close(); // (10) - } catch (IOException ignored) { - ignored.printStackTrace(); - } - }, - system.dispatcher()) - // #sample - .thenRunAsync( - () -> { - try { - runner.close(); - } catch (IOException ignored) { - ignored.printStackTrace(); - } - runner.clean(); - system.terminate(); - }); - } -} diff --git a/doc-examples/src/main/scala/elastic/FetchUsingSlickAndStreamIntoElastic.scala b/doc-examples/src/main/scala/elastic/FetchUsingSlickAndStreamIntoElastic.scala deleted file mode 100644 index 985ada826b..0000000000 --- a/doc-examples/src/main/scala/elastic/FetchUsingSlickAndStreamIntoElastic.scala +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright (C) 2016-2019 Lightbend Inc. - */ - -package elastic - -// #sample -import akka.Done -import akka.stream.alpakka.elasticsearch.WriteMessage._ -import akka.stream.alpakka.elasticsearch.scaladsl.ElasticsearchSink -import org.apache.http.HttpHost -import org.elasticsearch.client.RestClient -import akka.stream.alpakka.slick.javadsl.SlickSession -import akka.stream.alpakka.slick.scaladsl.Slick -import spray.json.DefaultJsonProtocol._ -import spray.json.JsonFormat - -import scala.concurrent.Future -import scala.concurrent.duration._ -// #sample - -import playground.elastic.ElasticsearchMock -import playground.{ActorSystemAvailable, ElasticSearchEmbedded} - -object FetchUsingSlickAndStreamIntoElastic extends ActorSystemAvailable with App { - - val runner = ElasticSearchEmbedded.startElasticInstance() - // format: off - // #sample - - implicit val session = SlickSession.forConfig("slick-h2-mem") // (1) - actorSystem.registerOnTermination(session.close()) - - import session.profile.api._ - // #sample - // format: on - ElasticsearchMock.populateDataForTable() - - // format: off - // #sample - class Movies(tag: Tag) extends Table[(Int, String, String, Double)](tag, "MOVIE") { // (2) - def id = column[Int]("ID") - def title = column[String]("TITLE") - def genre = column[String]("GENRE") - def gross = column[Double]("GROSS") - - override def * = (id, title, genre, gross) - } - - case class Movie(id: Int, title: String, genre: String, gross: Double) // (3) - - implicit val elasticSearchClient: RestClient = - RestClient.builder(new HttpHost("localhost", 9201)).build() // (4) - implicit val format: JsonFormat[Movie] = jsonFormat4(Movie) // (5) - - val done: Future[Done] = - Slick - .source(TableQuery[Movies].result) // (6) - .map { // (7) - case (id, genre, title, gross) => Movie(id, genre, title, gross) - } - .map(movie => createIndexMessage(movie.id.toString, movie)) // (8) - .runWith(ElasticsearchSink.create[Movie]("movie", "_doc")) // (9) - - done.onComplete { - case _ => - elasticSearchClient.close() // (10) - } - // #sample - // format: on - done.onComplete { - case _ => - runner.close() - runner.clean() - } - wait(10.seconds) - terminateActorSystem() -} diff --git a/doc-examples/src/main/scala/playground/ElasticSearchEmbedded.scala b/doc-examples/src/main/scala/playground/ElasticSearchEmbedded.scala deleted file mode 100644 index 4e60e394d4..0000000000 --- a/doc-examples/src/main/scala/playground/ElasticSearchEmbedded.scala +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright (C) 2016-2019 Lightbend Inc. - */ - -package playground - -import org.codelibs.elasticsearch.runner.ElasticsearchClusterRunner - -object ElasticSearchEmbedded { - private val runner = new ElasticsearchClusterRunner() - - def startElasticInstance() = { - runner.build( - ElasticsearchClusterRunner - .newConfigs() - .baseHttpPort(9200) - .baseTransportPort(9300) - .numOfNode(1) - .disableESLogger() - ) - runner.ensureYellow() - runner - } -} diff --git a/doc-examples/src/main/scala/playground/elastic/ElasticsearchMock.scala b/doc-examples/src/main/scala/playground/elastic/ElasticsearchMock.scala deleted file mode 100644 index 5baaa6a0e7..0000000000 --- a/doc-examples/src/main/scala/playground/elastic/ElasticsearchMock.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright (C) 2016-2019 Lightbend Inc. - */ - -package playground.elastic - -import akka.stream.Materializer -import akka.stream.alpakka.slick.javadsl.SlickSession -import akka.stream.alpakka.slick.scaladsl.Slick -import akka.stream.scaladsl.{Sink, Source} - -import scala.concurrent.Await -import scala.concurrent.duration._ -import scala.language.postfixOps - -object ElasticsearchMock { - - def populateDataForTable()(implicit session: SlickSession, materializer: Materializer) = { - - import session.profile.api._ - - //Drop table if already exists - val dropTableFut = - sqlu"""drop table if exists MOVIE""" - - //Create movie table - val createTableFut = - sqlu"""create table MOVIE (ID INT PRIMARY KEY, TITLE varchar, GENRE varchar, GROSS numeric(10,2))""" - - Await.result(session.db.run(dropTableFut), 10 seconds) - Await.result(session.db.run(createTableFut), 10 seconds) - - //A class just for organizing the data before using it in the insert clause. Could have been insertFut with a Tuple too - case class MovieInsert(id: Int, title: String, genre: String, gross: Double) - - val movies = List( - MovieInsert(1, "Rogue One", "Adventure", 3.032), - MovieInsert(2, "Beauty and the Beast", "Musical", 2.795), - MovieInsert(3, "Wonder Woman", "Action", 2.744), - MovieInsert(4, "Guardians of the Galaxy", "Action", 2.568), - MovieInsert(5, "Moana", "Musical", 2.493), - MovieInsert(6, "Spider-Man", "Action", 1.784) - ) - - Source(movies) - .via( - Slick.flow( - movie => sqlu"INSERT INTO MOVIE VALUES (${movie.id}, ${movie.title}, ${movie.genre}, ${movie.gross})" - ) - ) - .runWith(Sink.foreach(println)) - } -} diff --git a/docs/src/main/paradox/examples/elasticsearch-samples.md b/docs/src/main/paradox/examples/elasticsearch-samples.md index 8278fbe8d4..b8aecd46c1 100644 --- a/docs/src/main/paradox/examples/elasticsearch-samples.md +++ b/docs/src/main/paradox/examples/elasticsearch-samples.md @@ -2,38 +2,9 @@ ### Example: Index all data from an RDBMS table into Elasticsearch -- Instantiate a Slick database session using the config parameters defined in key `slick-h2-mem` -and mount closing it on shutdown of the Actor System (1) -- Scala only: Slick definition of the MOVIE table (2) -- Class that holds the Movie data (3) -- Instantiate Elastic REST client (4) -- Scala: Instantiate the Spray json format that converts the `Movie` case class to json (5) -- Java: Instantiate the Jackson Object mapper that converts the `Movie` class to json (5) -- Construct the Slick `Source` for the H2 table and query all data in the table (6) -- Scala only: Map each tuple into a `Movie` case class instance (7) -- The first argument of the `IncomingMessage` is the *id* of the document. Replace with `None` if you would Elastic to generate one (8) -- Prepare the Elastic `Sink` that the data needs to be drained to (9) -- Close the Elastic client upon completion of indexing the data (10) - -Scala -: @@snip [snip](/doc-examples/src/main/scala/elastic/FetchUsingSlickAndStreamIntoElastic.scala) { #sample } - -Java -: @@snip [snip](/doc-examples/src/main/java/elastic/FetchUsingSlickAndStreamIntoElasticInJava.java) { #sample } +This example is now available in the [Alpakka Samples](https://akka.io/alpakka-samples/jdbc-to-elasticsearch/) project. ### Example: Read from a Kafka topic and publish to Elasticsearch This example is now available in the [Alpakka Samples](https://akka.io/alpakka-samples/kafka-to-elasticsearch/) project. - -### Running the example code - -This example is contained in a stand-alone runnable main, it can be run - from `sbt` like this: - - -Scala -: ``` - sbt - > doc-examples/run - ``` diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 7ac5602baf..6f0a03cd60 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -90,10 +90,6 @@ object Dependencies { ) ) - val Csv = Seq( - libraryDependencies ++= Seq() - ) - val `Doc-examples` = Seq( libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-discovery" % AkkaVersion, @@ -106,14 +102,11 @@ object Dependencies { "com.typesafe.akka" %% "akka-http-xml" % AkkaHttpVersion, "com.typesafe.akka" %% "akka-http-spray-json" % AkkaHttpVersion, // https://github.com/akka/alpakka-kafka/releases - "com.typesafe.akka" %% "akka-stream-kafka" % "1.0.3", + "com.typesafe.akka" %% "akka-stream-kafka" % "1.0.4", // https://github.com/javaee/javax.jms "javax.jms" % "jms" % "1.1", // CDDL Version 1.1 // http://activemq.apache.org/download.html "org.apache.activemq" % "activemq-all" % "5.15.4" exclude ("log4j", "log4j") exclude ("org.slf4j", "slf4j-log4j12"), // ApacheV2 - "com.h2database" % "h2" % "1.4.197", // Eclipse Public License 1.0 - "org.elasticsearch.client" % "elasticsearch-rest-client" % "6.3.1", // ApacheV2 - "org.codelibs" % "elasticsearch-cluster-runner" % "6.3.1.0", // ApacheV2 "io.netty" % "netty-all" % "4.1.29.Final", // ApacheV2 "com.fasterxml.jackson.datatype" % "jackson-datatype-jdk8" % "2.9.9", "com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % "2.9.9",