diff --git a/build.sbt b/build.sbt index 5444c69206..1c0b7bd1fd 100644 --- a/build.sbt +++ b/build.sbt @@ -8,6 +8,7 @@ lazy val alpakka = project cassandra, csv, dynamodb, + elasticsearch, files, ftp, geode, @@ -88,6 +89,13 @@ lazy val dynamodb = project Dependencies.DynamoDB ) +lazy val elasticsearch = project + .enablePlugins(AutomateHeaderPlugin) + .settings( + name := "akka-stream-alpakka-elasticsearch", + Dependencies.Elasticsearch + ) + lazy val files = project // The name file is taken by `sbt.file`! .in(file("file")) .enablePlugins(AutomateHeaderPlugin) diff --git a/docs/src/main/paradox/connectors.md b/docs/src/main/paradox/connectors.md index e978af1e84..ab81f2b186 100644 --- a/docs/src/main/paradox/connectors.md +++ b/docs/src/main/paradox/connectors.md @@ -12,6 +12,7 @@ * [AWS SQS Connector](sqs.md) * [Azure Storage Queue](azure-storage-queue.md) * [Cassandra Connector](cassandra.md) +* [Elasticsearch Connectors](elasticsearch.md) * [File Connectors](file.md) * [FTP Connector](ftp.md) * [Google Cloud Pub/Sub Connector](google-cloud-pub-sub.md) diff --git a/docs/src/main/paradox/elasticsearch.md b/docs/src/main/paradox/elasticsearch.md new file mode 100644 index 0000000000..bebf32a27f --- /dev/null +++ b/docs/src/main/paradox/elasticsearch.md @@ -0,0 +1,130 @@ +# Elasticsearch Connector + +The Elasticsearch connector provides Akka Stream sources and sinks for Elasticsearch. + +For more information about Elasticsearch please visit the [official documentation](https://www.elastic.co/guide/index.html). + +## Artifacts + +sbt +: @@@vars + ```scala + libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-elasticsearch" % "$version$" + ``` + @@@ + +Maven +: @@@vars + ```xml + + com.lightbend.akka + akka-stream-alpakka-elasticsearch_$scala.binaryVersion$ + $version$ + + ``` + @@@ + +Gradle +: @@@vars + ```gradle + dependencies { + compile group: "com.lightbend.akka", name: "akka-stream-alpakka-elasticsearch_$scala.binaryVersion$", version: "$version$" + } + ``` + @@@ + +## Usage + +Sources, Flows and Sinks provided by this connector need a prepared `RestClient` to access to Elasticsearch. + +Scala +: @@snip (../../../../elasticsearch/src/test/scala/akka/stream/alpakka/elasticsearch/ElasticsearchSpec.scala) { #init-client } + +Java +: @@snip (../../../../elasticsearch/src/test/java/akka/stream/alpakka/elasticsearch/ElasticsearchTest.java) { #init-client } + +We will also need an @scaladoc[ActorSystem](akka.actor.ActorSystem) and an @scaladoc[ActorMaterializer](akka.stream.ActorMaterializer). + +Scala +: @@snip (../../../../elasticsearch/src/test/scala/akka/stream/alpakka/elasticsearch/ElasticsearchSpec.scala) { #init-mat } + +Java +: @@snip (../../../../elasticsearch/src/test/java/akka/stream/alpakka/elasticsearch/ElasticsearchTest.java) { #init-mat } + +This is all preparation that we are going to need. + +### JsObject message + +Now we can stream messages which contains spray-json's `JsObject` (in Scala) or `java.util.Map` (in Java) +from or to Elasticsearch where we have access to by providing the `RestClient` to the +@scaladoc[ElasticsearchSource](akka.stream.alpakka.elasticsearch.scaladsl.ElasticsearchSource$) or the +@scaladoc[ElasticsearchSink](akka.stream.alpakka.elasticsearch.scaladsl.ElasticsearchSink$). + +Scala +: @@snip (../../../../elasticsearch/src/test/scala/akka/stream/alpakka/elasticsearch/ElasticsearchSpec.scala) { #run-jsobject } + +Java +: @@snip (../../../../elasticsearch/src/test/java/akka/stream/alpakka/elasticsearch/ElasticsearchTest.java) { #run-jsobject } + + +### Typed messages + +Also, it's possible to stream messages which contains any classes. In Scala, spray-json is used for JSON conversion, +so defining the mapped class and `JsonFormat` for it is necessary. In Java, Jackson is used, so just define the mapped class. + +Scala +: @@snip (../../../../elasticsearch/src/test/scala/akka/stream/alpakka/elasticsearch/ElasticsearchSpec.scala) { #define-class } + +Java +: @@snip (../../../../elasticsearch/src/test/java/akka/stream/alpakka/elasticsearch/ElasticsearchTest.java) { #define-class } + + +Use `ElasticsearchSource.typed` and `ElasticsearchSink.typed` to create source and sink instead. + +Scala +: @@snip (../../../../elasticsearch/src/test/scala/akka/stream/alpakka/elasticsearch/ElasticsearchSpec.scala) { #run-typed } + +Java +: @@snip (../../../../elasticsearch/src/test/java/akka/stream/alpakka/elasticsearch/ElasticsearchTest.java) { #run-typed } + + +### Configuration + +We can specify the buffer size for the source. + +Scala (source) +: @@snip (../../../../elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchSourceSettings.scala) { #source-settings } + +Also, we can specify the buffer size, the max retry count and the retry interval for the sink. + +Scala (sink) +: @@snip (../../../../elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchSinkSettings.scala) { #sink-settings } + +`ElasticsearchSource` retrieves messages from Elasticsearch by scroll scan. This buffer size is used as the scroll size. +`ElasticsearchSink` puts messages by one bulk request per messages of this buffer size. + +### Using Elasticsearch as a Flow + +You can also build flow stages. The API is similar to creating Sinks. + +Scala (flow) +: @@snip (../../../../elasticsearch/src/test/scala/akka/stream/alpakka/elasticsearch/ElasticsearchSpec.scala) { #run-flow } + +Java (flow) +: @@snip (../../../../elasticsearch/src/test/java/akka/stream/alpakka/elasticsearch/ElasticsearchTest.java) { #run-flow } + +### Running the example code + +The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt. + +Scala +: ``` + sbt + > elasticsearch/testOnly *.ElasticsearchSpec + ``` + +Java +: ``` + sbt + > elasticsearch/testOnly *.ElasticsearchTest + ``` diff --git a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/ElasticsearchFlowStage.scala b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/ElasticsearchFlowStage.scala new file mode 100644 index 0000000000..97af850072 --- /dev/null +++ b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/ElasticsearchFlowStage.scala @@ -0,0 +1,187 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.elasticsearch + +import akka.stream.{Attributes, FlowShape, Inlet, Outlet} +import akka.stream.stage._ +import org.apache.http.entity.StringEntity +import org.elasticsearch.client.{Response, ResponseListener, RestClient} + +import scala.collection.mutable +import scala.collection.JavaConverters._ +import spray.json._ + +import scala.concurrent.Future +import scala.concurrent.duration._ +import ElasticsearchFlowStage._ +import akka.NotUsed +import akka.stream.alpakka.elasticsearch.scaladsl.ElasticsearchSinkSettings +import org.apache.http.message.BasicHeader +import org.apache.http.util.EntityUtils + +final case class IncomingMessage[T](id: Option[String], source: T) + +trait MessageWriter[T] { + def convert(message: T): String +} + +class ElasticsearchFlowStage[T, R]( + indexName: String, + typeName: String, + client: RestClient, + settings: ElasticsearchSinkSettings, + pusher: Seq[IncomingMessage[T]] => R, + writer: MessageWriter[T] +) extends GraphStage[FlowShape[IncomingMessage[T], Future[R]]] { + + private val in = Inlet[IncomingMessage[T]]("messages") + private val out = Outlet[Future[R]]("failed") + override val shape = FlowShape(in, out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new TimerGraphStageLogic(shape) with InHandler with OutHandler { + + private var state: State = Idle + private val queue = new mutable.Queue[IncomingMessage[T]]() + private val failureHandler = getAsyncCallback[(Seq[IncomingMessage[T]], Throwable)](handleFailure) + private val responseHandler = getAsyncCallback[(Seq[IncomingMessage[T]], Response)](handleResponse) + private var failedMessages: Seq[IncomingMessage[T]] = Nil + private var retryCount: Int = 0 + + override def preStart(): Unit = + pull(in) + + private def tryPull(): Unit = + if (queue.size < settings.bufferSize && !isClosed(in) && !hasBeenPulled(in)) { + pull(in) + } + + override def onTimer(timerKey: Any): Unit = { + sendBulkUpdateRequest(failedMessages) + failedMessages = Nil + } + + private def handleFailure(args: (Seq[IncomingMessage[T]], Throwable)): Unit = { + val (messages, exception) = args + if (retryCount >= settings.maxRetry) { + failStage(exception) + } else { + retryCount = retryCount + 1 + failedMessages = messages + scheduleOnce(NotUsed, settings.retryInterval.millis) + } + } + + private def handleSuccess(): Unit = + completeStage() + + private def handleResponse(args: (Seq[IncomingMessage[T]], Response)): Unit = { + retryCount = 0 + val (messages, response) = args + val responseJson = EntityUtils.toString(response.getEntity).parseJson + + // If some commands in bulk request failed, pass failed messages to follows. + val items = responseJson.asJsObject.fields("items").asInstanceOf[JsArray] + val failedMessages = items.elements.zip(messages).flatMap { + case (item, message) => + val result = item.asJsObject.fields("index").asJsObject.fields("result").asInstanceOf[JsString].value + if (result == "created" || result == "updated") { + None + } else { + Some(message) + } + } + + // Fetch next messages from queue and send them + val nextMessages = (1 to settings.bufferSize).flatMap { _ => + queue.dequeueFirst(_ => true) + } + + if (nextMessages.isEmpty) { + state match { + case Finished => handleSuccess() + case _ => state = Idle + } + } else { + sendBulkUpdateRequest(nextMessages) + } + + push(out, Future.successful(pusher(failedMessages))) + } + + private def sendBulkUpdateRequest(messages: Seq[IncomingMessage[T]]): Unit = { + val json = messages + .map { message => + JsObject( + "index" -> JsObject( + Seq( + Option("_index" -> JsString(indexName)), + Option("_type" -> JsString(typeName)), + message.id.map { id => + "_id" -> JsString(id) + } + ).flatten: _* + ) + ).toString + "\n" + writer.convert(message.source) + } + .mkString("", "\n", "\n") + + client.performRequestAsync( + "POST", + "/_bulk", + Map[String, String]().asJava, + new StringEntity(json), + new ResponseListener() { + override def onFailure(exception: Exception): Unit = + failureHandler.invoke((messages, exception)) + override def onSuccess(response: Response): Unit = + responseHandler.invoke((messages, response)) + }, + new BasicHeader("Content-Type", "application/x-ndjson") + ) + } + + setHandlers(in, out, this) + + override def onPull(): Unit = tryPull() + + override def onPush(): Unit = { + val message = grab(in) + queue.enqueue(message) + + state match { + case Idle => { + state = Sending + val messages = (1 to settings.bufferSize).flatMap { _ => + queue.dequeueFirst(_ => true) + } + sendBulkUpdateRequest(messages) + } + case _ => () + } + + tryPull() + } + + override def onUpstreamFailure(exception: Throwable): Unit = + failStage(exception) + + override def onUpstreamFinish(): Unit = + state match { + case Idle => handleSuccess() + case Sending => state = Finished + case Finished => () + } + } + +} + +object ElasticsearchFlowStage { + + private sealed trait State + private case object Idle extends State + private case object Sending extends State + private case object Finished extends State + +} diff --git a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/ElasticsearchSourceStage.scala b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/ElasticsearchSourceStage.scala new file mode 100644 index 0000000000..14531d6388 --- /dev/null +++ b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/ElasticsearchSourceStage.scala @@ -0,0 +1,117 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.elasticsearch + +import java.io.ByteArrayOutputStream + +import akka.stream.{Attributes, Outlet, SourceShape} +import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler} +import org.apache.http.entity.StringEntity +import org.elasticsearch.client.{Response, ResponseListener, RestClient} +import spray.json._ +import DefaultJsonProtocol._ +import akka.stream.alpakka.elasticsearch.scaladsl.ElasticsearchSourceSettings +import org.apache.http.message.BasicHeader + +import scala.collection.JavaConverters._ + +final case class OutgoingMessage[T](id: String, source: T) + +case class ScrollResponse[T](error: Option[String], result: Option[ScrollResult[T]]) +case class ScrollResult[T](scrollId: String, messages: Seq[OutgoingMessage[T]]) + +trait MessageReader[T] { + def convert(json: String): ScrollResponse[T] +} + +final class ElasticsearchSourceStage[T](indexName: String, + typeName: String, + query: String, + client: RestClient, + settings: ElasticsearchSourceSettings, + reader: MessageReader[T]) + extends GraphStage[SourceShape[OutgoingMessage[T]]] { + + val out: Outlet[OutgoingMessage[T]] = Outlet("ElasticsearchSource.out") + override val shape: SourceShape[OutgoingMessage[T]] = SourceShape(out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new ElasticsearchSourceLogic[T](indexName, typeName, query, client, settings, out, shape, reader) + +} + +sealed class ElasticsearchSourceLogic[T](indexName: String, + typeName: String, + query: String, + client: RestClient, + settings: ElasticsearchSourceSettings, + out: Outlet[OutgoingMessage[T]], + shape: SourceShape[OutgoingMessage[T]], + reader: MessageReader[T]) + extends GraphStageLogic(shape) + with ResponseListener + with OutHandler { + + private var scrollId: String = null + private val responseHandler = getAsyncCallback[Response](handleResponse) + private val failureHandler = getAsyncCallback[Throwable](handleFailure) + + def sendScrollScanRequest(): Unit = + try { + if (scrollId == null) { + client.performRequestAsync( + "POST", + s"/$indexName/$typeName/_search", + Map("scroll" -> "5m", "sort" -> "_doc").asJava, + new StringEntity(s"""{"size": ${settings.bufferSize}, "query": ${query}}"""), + this, + new BasicHeader("Content-Type", "application/json") + ) + } else { + client.performRequestAsync( + "POST", + s"/_search/scroll", + Map[String, String]().asJava, + new StringEntity(Map("scroll" -> "5m", "scroll_id" -> scrollId).toJson.toString), + this, + new BasicHeader("Content-Type", "application/json") + ) + } + } catch { + case ex: Exception => handleFailure(ex) + } + + override def onFailure(exception: Exception) = failureHandler.invoke(exception) + override def onSuccess(response: Response) = responseHandler.invoke(response) + + def handleFailure(ex: Throwable): Unit = + failStage(ex) + + def handleResponse(res: Response): Unit = { + val json = { + val out = new ByteArrayOutputStream() + try { + res.getEntity.writeTo(out) + new String(out.toByteArray, "UTF-8") + } finally { + out.close() + } + } + + reader.convert(json) match { + case ScrollResponse(Some(error), _) => + failStage(new IllegalStateException(error)) + case ScrollResponse(None, Some(result)) if result.messages.isEmpty => + completeStage() + case ScrollResponse(_, Some(result)) => + scrollId = result.scrollId + emitMultiple(out, result.messages.toIterator) + } + } + + setHandler(out, this) + + override def onPull(): Unit = sendScrollScanRequest() + +} diff --git a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/javadsl/ElasticsearchFlow.scala b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/javadsl/ElasticsearchFlow.scala new file mode 100644 index 0000000000..331148b39e --- /dev/null +++ b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/javadsl/ElasticsearchFlow.scala @@ -0,0 +1,71 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.elasticsearch.javadsl + +import akka.NotUsed +import akka.stream.alpakka.elasticsearch.{ElasticsearchFlowStage, IncomingMessage, MessageWriter} +import akka.stream.scaladsl.Flow +import com.fasterxml.jackson.databind.ObjectMapper +import org.elasticsearch.client.RestClient + +import scala.collection.JavaConverters._ +import java.util.{List => JavaList, Map => JavaMap} + +object ElasticsearchFlow { + + /** + * Java API: creates a [[ElasticsearchFlowStage]] that accepts as JsObject + */ + def create( + indexName: String, + typeName: String, + settings: ElasticsearchSinkSettings, + client: RestClient + ): akka.stream.javadsl.Flow[IncomingMessage[JavaMap[String, Object]], JavaList[ + IncomingMessage[JavaMap[String, Object]] + ], NotUsed] = + Flow + .fromGraph( + new ElasticsearchFlowStage[JavaMap[String, Object], JavaList[IncomingMessage[JavaMap[String, Object]]]]( + indexName, + typeName, + client, + settings.asScala, + _.asJava, + new JacksonWriter[JavaMap[String, Object]]() + ) + ) + .mapAsync(1)(identity) + .asJava + + /** + * Java API: creates a [[ElasticsearchFlowStage]] that accepts specific type + */ + def typed[T]( + indexName: String, + typeName: String, + settings: ElasticsearchSinkSettings, + client: RestClient + ): akka.stream.javadsl.Flow[IncomingMessage[T], JavaList[IncomingMessage[T]], NotUsed] = + Flow + .fromGraph( + new ElasticsearchFlowStage[T, JavaList[IncomingMessage[T]]](indexName, + typeName, + client, + settings.asScala, + _.asJava, + new JacksonWriter[T]()) + ) + .mapAsync(1)(identity) + .asJava + + private class JacksonWriter[T] extends MessageWriter[T] { + + private val mapper = new ObjectMapper() + + override def convert(message: T): String = + mapper.writeValueAsString(message) + } + +} diff --git a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/javadsl/ElasticsearchSink.scala b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/javadsl/ElasticsearchSink.scala new file mode 100644 index 0000000000..2d2c50f9a6 --- /dev/null +++ b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/javadsl/ElasticsearchSink.scala @@ -0,0 +1,39 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.elasticsearch.javadsl + +import java.util.concurrent.CompletionStage + +import akka.{Done, NotUsed} +import akka.stream.alpakka.elasticsearch._ +import akka.stream.javadsl._ +import org.elasticsearch.client.RestClient + +object ElasticsearchSink { + + /** + * Java API: creates a sink based on [[ElasticsearchFlowStage]] that accepts as JsObject + */ + def create( + indexName: String, + typeName: String, + settings: ElasticsearchSinkSettings, + client: RestClient + ): akka.stream.javadsl.Sink[IncomingMessage[java.util.Map[String, Object]], CompletionStage[Done]] = + ElasticsearchFlow + .create(indexName, typeName, settings, client) + .toMat(Sink.ignore, Keep.right[NotUsed, CompletionStage[Done]]) + + /** + * Java API: creates a sink based on [[ElasticsearchFlowStage]] that accepts as specific type + */ + def typed[T](indexName: String, + typeName: String, + settings: ElasticsearchSinkSettings, + client: RestClient): akka.stream.javadsl.Sink[IncomingMessage[T], CompletionStage[Done]] = + ElasticsearchFlow + .typed(indexName, typeName, settings, client) + .toMat(Sink.ignore, Keep.right[NotUsed, CompletionStage[Done]]) + +} diff --git a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/javadsl/ElasticsearchSinkSettings.scala b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/javadsl/ElasticsearchSinkSettings.scala new file mode 100644 index 0000000000..627d3cf84c --- /dev/null +++ b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/javadsl/ElasticsearchSinkSettings.scala @@ -0,0 +1,29 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.elasticsearch.javadsl + +import akka.stream.alpakka.elasticsearch._ +import scaladsl.{ElasticsearchSinkSettings => ScalaElasticsearchSinkSettings} + +final class ElasticsearchSinkSettings(val bufferSize: Int, val retryInterval: Int, val maxRetry: Int) { + + def this() = this(10, 5000, 100) + + def withBufferSize(bufferSize: Int): ElasticsearchSinkSettings = + new ElasticsearchSinkSettings(bufferSize, this.retryInterval, this.maxRetry) + + def withRetryInterval(retryInterval: Int): ElasticsearchSinkSettings = + new ElasticsearchSinkSettings(this.bufferSize, retryInterval, this.maxRetry) + + def withMaxRetry(maxRetry: Int): ElasticsearchSinkSettings = + new ElasticsearchSinkSettings(this.bufferSize, this.retryInterval, maxRetry) + + private[javadsl] def asScala: ScalaElasticsearchSinkSettings = + ScalaElasticsearchSinkSettings( + bufferSize = this.bufferSize, + retryInterval = this.retryInterval, + maxRetry = this.maxRetry + ) + +} diff --git a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/javadsl/ElasticsearchSource.scala b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/javadsl/ElasticsearchSource.scala new file mode 100644 index 0000000000..ffa449afb8 --- /dev/null +++ b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/javadsl/ElasticsearchSource.scala @@ -0,0 +1,85 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.elasticsearch.javadsl + +import akka.NotUsed +import akka.stream.alpakka.elasticsearch._ +import akka.stream.javadsl.Source +import org.elasticsearch.client.RestClient +import com.fasterxml.jackson.databind.ObjectMapper +import scala.collection.JavaConverters._ + +object ElasticsearchSource { + + /** + * Java API: creates a [[ElasticsearchSourceStage]] that consumes as java.util.Map + */ + def create(indexName: String, + typeName: String, + query: String, + settings: ElasticsearchSourceSettings, + client: RestClient): Source[OutgoingMessage[java.util.Map[String, Object]], NotUsed] = + Source.fromGraph( + new ElasticsearchSourceStage( + indexName, + typeName, + query, + client, + settings.asScala, + new JacksonReader[java.util.Map[String, Object]](classOf[java.util.Map[String, Object]]) + ) + ) + + /** + * Java API: creates a [[ElasticsearchSourceStage]] that consumes as specific type + */ + def typed[T](indexName: String, + typeName: String, + query: String, + settings: ElasticsearchSourceSettings, + client: RestClient, + clazz: Class[T]): Source[OutgoingMessage[T], NotUsed] = + Source.fromGraph( + new ElasticsearchSourceStage( + indexName, + typeName, + query, + client, + settings.asScala, + new JacksonReader[T](clazz) + ) + ) + + private class JacksonReader[T](clazz: Class[T]) extends MessageReader[T] { + + private val mapper = new ObjectMapper() + + override def convert(json: String): ScrollResponse[T] = { + val map = mapper.readValue(json, classOf[java.util.Map[String, Object]]) + val error = map.get("error") + if (error != null) { + ScrollResponse(Some(error.toString), None) + } else { + val scrollId = map.get("_scroll_id").asInstanceOf[String] + val hits = map + .get("hits") + .asInstanceOf[java.util.Map[String, Object]] + .get("hits") + .asInstanceOf[java.util.List[java.util.Map[String, Object]]] + val messages = hits.asScala.map { element => + val id = element.get("_id").asInstanceOf[String] + val source = element.get("_source").asInstanceOf[java.util.Map[String, Object]] + if (clazz.isAssignableFrom(classOf[java.util.Map[String, Object]])) { + OutgoingMessage[T](id, source.asInstanceOf[T]) + } else { + val obj = mapper.readValue(mapper.writeValueAsString(source), clazz) + OutgoingMessage[T](id, obj) + } + } + ScrollResponse(None, Some(ScrollResult(scrollId, messages))) + } + } + } + +} diff --git a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/javadsl/ElasticsearchSourceSettings.scala b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/javadsl/ElasticsearchSourceSettings.scala new file mode 100644 index 0000000000..1e56f995d0 --- /dev/null +++ b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/javadsl/ElasticsearchSourceSettings.scala @@ -0,0 +1,20 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.elasticsearch.javadsl + +import akka.stream.alpakka.elasticsearch._ +import scaladsl.{ElasticsearchSourceSettings => ScalaElasticsearchSourceSettings} + +final class ElasticsearchSourceSettings(val bufferSize: Int) { + + def this() = this(10) + + def withBufferSize(bufferSize: Int): ElasticsearchSourceSettings = + new ElasticsearchSourceSettings(bufferSize) + + private[javadsl] def asScala: ScalaElasticsearchSourceSettings = + ScalaElasticsearchSourceSettings( + bufferSize = this.bufferSize + ) +} diff --git a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchFlow.scala b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchFlow.scala new file mode 100644 index 0000000000..74d79352d1 --- /dev/null +++ b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchFlow.scala @@ -0,0 +1,55 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.elasticsearch.scaladsl + +import akka.NotUsed +import akka.stream.alpakka.elasticsearch.{ElasticsearchFlowStage, IncomingMessage, MessageWriter} +import akka.stream.scaladsl.Flow +import org.elasticsearch.client.RestClient +import spray.json._ + +object ElasticsearchFlow { + + /** + * Scala API: creates a [[ElasticsearchFlowStage]] that accepts as JsObject + */ + def apply(indexName: String, typeName: String, settings: ElasticsearchSinkSettings)( + implicit client: RestClient + ): Flow[IncomingMessage[JsObject], Seq[IncomingMessage[JsObject]], NotUsed] = + Flow + .fromGraph( + new ElasticsearchFlowStage[JsObject, Seq[IncomingMessage[JsObject]]]( + indexName, + typeName, + client, + settings, + identity, + new SprayJsonWriter[JsObject]()(DefaultJsonProtocol.RootJsObjectFormat) + ) + ) + .mapAsync(1)(identity) + + /** + * Scala API: creates a [[ElasticsearchFlowStage]] that accepts specific type + */ + def typed[T](indexName: String, typeName: String, settings: ElasticsearchSinkSettings)( + implicit client: RestClient, + writer: JsonWriter[T] + ): Flow[IncomingMessage[T], Seq[IncomingMessage[T]], NotUsed] = + Flow + .fromGraph( + new ElasticsearchFlowStage[T, Seq[IncomingMessage[T]]](indexName, + typeName, + client, + settings, + identity, + new SprayJsonWriter[T]()(writer)) + ) + .mapAsync(1)(identity) + + private class SprayJsonWriter[T](implicit writer: JsonWriter[T]) extends MessageWriter[T] { + override def convert(message: T): String = message.toJson.toString() + } + +} diff --git a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchSink.scala b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchSink.scala new file mode 100644 index 0000000000..e878070efa --- /dev/null +++ b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchSink.scala @@ -0,0 +1,33 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.elasticsearch.scaladsl + +import akka.Done +import akka.stream.alpakka.elasticsearch.{ElasticsearchFlowStage, IncomingMessage} +import akka.stream.scaladsl.{Keep, Sink} +import org.elasticsearch.client.RestClient +import spray.json.{JsObject, JsonWriter} + +import scala.concurrent.Future + +object ElasticsearchSink { + + /** + * Scala API: creates a sink based on [[ElasticsearchFlowStage]] that accepts as JsObject + */ + def apply(indexName: String, typeName: String, settings: ElasticsearchSinkSettings)( + implicit client: RestClient + ): Sink[IncomingMessage[JsObject], Future[Done]] = + ElasticsearchFlow.apply(indexName, typeName, settings).toMat(Sink.ignore)(Keep.right) + + /** + * Scala API: creates a sink based on [[ElasticsearchFlowStage]] that accepts as specific type + */ + def typed[T](indexName: String, typeName: String, settings: ElasticsearchSinkSettings)( + implicit client: RestClient, + writer: JsonWriter[T] + ): Sink[IncomingMessage[T], Future[Done]] = + ElasticsearchFlow.typed[T](indexName, typeName, settings).toMat(Sink.ignore)(Keep.right) + +} diff --git a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchSinkSettings.scala b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchSinkSettings.scala new file mode 100644 index 0000000000..a14487baa4 --- /dev/null +++ b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchSinkSettings.scala @@ -0,0 +1,8 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.elasticsearch.scaladsl + +//#sink-settings +final case class ElasticsearchSinkSettings(bufferSize: Int = 10, retryInterval: Int = 5000, maxRetry: Int = 100) +//#sink-settings diff --git a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchSource.scala b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchSource.scala new file mode 100644 index 0000000000..f7cc2eaf51 --- /dev/null +++ b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchSource.scala @@ -0,0 +1,67 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.elasticsearch.scaladsl + +import akka.NotUsed +import akka.stream.alpakka.elasticsearch._ +import akka.stream.scaladsl.Source +import org.elasticsearch.client.RestClient +import spray.json._ +import DefaultJsonProtocol._ + +object ElasticsearchSource { + + /** + * Scala API: creates a [[ElasticsearchSourceStage]] that consumes as JsObject + */ + def apply(indexName: String, typeName: String, query: String, settings: ElasticsearchSourceSettings)( + implicit client: RestClient + ): Source[OutgoingMessage[JsObject], NotUsed] = + Source.fromGraph( + new ElasticsearchSourceStage( + indexName, + typeName, + query, + client, + settings, + new SprayJsonReader[JsObject]()(DefaultJsonProtocol.RootJsObjectFormat) + ) + ) + + /** + * Scala API: creates a [[ElasticsearchSourceStage]] that consumes as specific type + */ + def typed[T](indexName: String, typeName: String, query: String, settings: ElasticsearchSourceSettings)( + implicit client: RestClient, + reader: JsonReader[T] + ): Source[OutgoingMessage[T], NotUsed] = + Source.fromGraph( + new ElasticsearchSourceStage(indexName, typeName, query, client, settings, new SprayJsonReader[T]()(reader)) + ) + + private class SprayJsonReader[T](implicit reader: JsonReader[T]) extends MessageReader[T] { + + override def convert(json: String): ScrollResponse[T] = { + val jsObj = json.parseJson.asJsObject + jsObj.fields.get("error") match { + case Some(error) => { + ScrollResponse(Some(error.toString), None) + } + case None => { + val scrollId = jsObj.fields("_scroll_id").asInstanceOf[JsString].value + val hits = jsObj.fields("hits").asJsObject.fields("hits").asInstanceOf[JsArray] + val messages = hits.elements.reverse.map { element => + val doc = element.asJsObject + val id = doc.fields("_id").asInstanceOf[JsString].value + val source = doc.fields("_source").asJsObject + OutgoingMessage(id, source.convertTo[T]) + } + ScrollResponse(None, Some(ScrollResult(scrollId, messages))) + } + } + } + + } + +} diff --git a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchSourceSettings.scala b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchSourceSettings.scala new file mode 100644 index 0000000000..1f77050f26 --- /dev/null +++ b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchSourceSettings.scala @@ -0,0 +1,8 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.elasticsearch.scaladsl + +//#source-settings +final case class ElasticsearchSourceSettings(bufferSize: Int = 10) +//#source-settings diff --git a/elasticsearch/src/test/java/akka/stream/alpakka/elasticsearch/ElasticsearchTest.java b/elasticsearch/src/test/java/akka/stream/alpakka/elasticsearch/ElasticsearchTest.java new file mode 100644 index 0000000000..a0616a7f97 --- /dev/null +++ b/elasticsearch/src/test/java/akka/stream/alpakka/elasticsearch/ElasticsearchTest.java @@ -0,0 +1,243 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.elasticsearch; + +import akka.Done; +import akka.actor.ActorSystem; +import akka.stream.ActorMaterializer; +import akka.stream.alpakka.elasticsearch.javadsl.*; +import akka.stream.javadsl.Sink; +import akka.testkit.JavaTestKit; +import org.apache.http.HttpHost; +import org.apache.http.entity.StringEntity; +import org.codelibs.elasticsearch.runner.ElasticsearchClusterRunner; +import org.elasticsearch.client.RestClient; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import scala.Some; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.CompletionStage; + +import static org.junit.Assert.assertEquals; + +public class ElasticsearchTest { + + private static ElasticsearchClusterRunner runner; + private static RestClient client; + private static ActorSystem system; + private static ActorMaterializer materializer; + + //#define-class + public static class Book { + public String title; + } + //#define-class + + @BeforeClass + public static void setup() throws IOException { + runner = new ElasticsearchClusterRunner(); + runner.build(ElasticsearchClusterRunner.newConfigs().baseHttpPort(9210).baseTransportPort(9310).numOfNode(1)); + runner.ensureYellow(); + + //#init-client + client = RestClient.builder(new HttpHost("localhost", 9211)).build(); + //#init-client + + //#init-mat + system = ActorSystem.create(); + materializer = ActorMaterializer.create(system); + //#init-mat + + + register("source", "Akka in Action"); + register("source", "Programming in Scala"); + register("source", "Learning Scala"); + register("source", "Scala for Spark in Production"); + register("source", "Scala Puzzlers"); + register("source", "Effective Akka"); + register("source", "Akka Concurrency"); + flush("source"); + } + + @AfterClass + public static void teardown() throws Exception { + runner.close(); + runner.clean(); + client.close(); + JavaTestKit.shutdownActorSystem(system); + } + + + private static void flush(String indexName) throws IOException { + client.performRequest("POST", indexName + "/_flush"); + } + + private static void register(String indexName, String title) throws IOException { + client.performRequest("POST", + indexName + "/book", + new HashMap<>(), + new StringEntity(String.format("{\"title\": \"%s\"}", title))); + } + + + @Test + public void jsObjectStream() throws Exception { + // Copy source/book to sink1/book through JsObject stream + //#run-jsobject + CompletionStage f1 = ElasticsearchSource.create( + "source", + "book", + "{\"match_all\": {}}", + new ElasticsearchSourceSettings().withBufferSize(5), + client) + .map(m -> new IncomingMessage<>(new Some(m.id()), m.source())) + .runWith( + ElasticsearchSink.create( + "sink1", + "book", + new ElasticsearchSinkSettings().withBufferSize(5), + client), + materializer); + //#run-jsobject + + f1.toCompletableFuture().get(); + + flush("sink1"); + + // Assert docs in sink1/book + CompletionStage> f2 = ElasticsearchSource.create( + "sink1", + "book", + "{\"match_all\": {}}", + new ElasticsearchSourceSettings(5), + client) + .map(m -> (String) m.source().get("title")) + .runWith(Sink.seq(), materializer); + + List result = new ArrayList<>(f2.toCompletableFuture().get()); + + List expect = Arrays.asList( + "Akka Concurrency", + "Akka in Action", + "Effective Akka", + "Learning Scala", + "Programming in Scala", + "Scala Puzzlers", + "Scala for Spark in Production" + ); + + Collections.sort(result); + assertEquals(expect, result); + } + + @Test + public void typedStream() throws Exception { + // Copy source/book to sink2/book through JsObject stream + //#run-typed + CompletionStage f1 = ElasticsearchSource.typed( + "source", + "book", + "{\"match_all\": {}}", + new ElasticsearchSourceSettings().withBufferSize(5), + client, + Book.class) + .map(m -> new IncomingMessage<>(new Some(m.id()), m.source())) + .runWith( + ElasticsearchSink.typed( + "sink2", + "book", + new ElasticsearchSinkSettings().withBufferSize(5), + client), + materializer); + //#run-typed + + f1.toCompletableFuture().get(); + + flush("sink2"); + + // Assert docs in sink2/book + CompletionStage> f2 = ElasticsearchSource.typed( + "sink2", + "book", + "{\"match_all\": {}}", + new ElasticsearchSourceSettings().withBufferSize(5), + client, + Book.class) + .map(m -> m.source().title) + .runWith(Sink.seq(), materializer); + + List result = new ArrayList<>(f2.toCompletableFuture().get()); + + List expect = Arrays.asList( + "Akka Concurrency", + "Akka in Action", + "Effective Akka", + "Learning Scala", + "Programming in Scala", + "Scala Puzzlers", + "Scala for Spark in Production" + ); + + Collections.sort(result); + assertEquals(expect, result); + } + + @Test + public void flow() throws Exception { + // Copy source/book to sink3/book through JsObject stream + //#run-flow + CompletionStage>>> f1 = ElasticsearchSource.typed( + "source", + "book", + "{\"match_all\": {}}", + new ElasticsearchSourceSettings().withBufferSize(5), + client, + Book.class) + .map(m -> new IncomingMessage<>(new Some(m.id()), m.source())) + .via(ElasticsearchFlow.typed( + "sink3", + "book", + new ElasticsearchSinkSettings().withBufferSize(5), + client)) + .runWith(Sink.seq(), materializer); + //#run-flow + + List>> result1 = f1.toCompletableFuture().get(); + flush("sink3"); + + assertEquals(2, result1.size()); + assertEquals(true, result1.get(0).isEmpty()); + assertEquals(true, result1.get(1).isEmpty()); + + // Assert docs in sink3/book + CompletionStage> f2 = ElasticsearchSource.typed( + "sink3", + "book", + "{\"match_all\": {}}", + new ElasticsearchSourceSettings().withBufferSize(5), + client, + Book.class) + .map(m -> m.source().title) + .runWith(Sink.seq(), materializer); + + List result2 = new ArrayList<>(f2.toCompletableFuture().get()); + + List expect = Arrays.asList( + "Akka Concurrency", + "Akka in Action", + "Effective Akka", + "Learning Scala", + "Programming in Scala", + "Scala Puzzlers", + "Scala for Spark in Production" + ); + + Collections.sort(result2); + assertEquals(expect, result2); + } + +} \ No newline at end of file diff --git a/elasticsearch/src/test/scala/akka/stream/alpakka/elasticsearch/ElasticsearchSpec.scala b/elasticsearch/src/test/scala/akka/stream/alpakka/elasticsearch/ElasticsearchSpec.scala new file mode 100644 index 0000000000..aee0055a0f --- /dev/null +++ b/elasticsearch/src/test/scala/akka/stream/alpakka/elasticsearch/ElasticsearchSpec.scala @@ -0,0 +1,232 @@ +/* + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.stream.alpakka.elasticsearch + +import akka.actor.ActorSystem +import akka.stream.ActorMaterializer +import akka.stream.alpakka.elasticsearch.scaladsl._ +import akka.stream.scaladsl.Sink +import akka.testkit.TestKit +import org.apache.http.HttpHost +import org.apache.http.entity.StringEntity +import org.codelibs.elasticsearch.runner.ElasticsearchClusterRunner +import org.elasticsearch.client.RestClient +import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec} +import spray.json.JsString + +import scala.concurrent.Await +import scala.concurrent.duration.Duration +import scala.collection.JavaConverters._ +import spray.json._ +import DefaultJsonProtocol._ + +class ElasticsearchSpec extends WordSpec with Matchers with BeforeAndAfterAll { + + private val runner = new ElasticsearchClusterRunner() + + //#init-mat + implicit val system = ActorSystem() + implicit val materializer = ActorMaterializer() + //#init-mat + //#init-client + implicit val client = RestClient.builder(new HttpHost("localhost", 9201)).build() + //#init-client + + //#define-class + case class Book(title: String) + implicit val format = jsonFormat1(Book) + //#define-class + + override def beforeAll() = { + runner.build(ElasticsearchClusterRunner.newConfigs().baseHttpPort(9200).baseTransportPort(9300).numOfNode(1)) + runner.ensureYellow() + + register("source", "Akka in Action") + register("source", "Programming in Scala") + register("source", "Learning Scala") + register("source", "Scala for Spark in Production") + register("source", "Scala Puzzlers") + register("source", "Effective Akka") + register("source", "Akka Concurrency") + flush("source") + } + + override def afterAll() = { + runner.close() + runner.clean() + client.close() + TestKit.shutdownActorSystem(system) + } + + private def flush(indexName: String): Unit = + client.performRequest("POST", s"$indexName/_flush") + + private def register(indexName: String, title: String): Unit = + client.performRequest("POST", + s"$indexName/book", + Map[String, String]().asJava, + new StringEntity(s"""{"title": "$title"}""")) + + "Elasticsearch connector" should { + "consume and publish documents as JsObject" in { + // Copy source/book to sink1/book through JsObject stream + //#run-jsobject + val f1 = ElasticsearchSource( + "source", + "book", + """{"match_all": {}}""", + ElasticsearchSourceSettings(5) + ).map { message: OutgoingMessage[JsObject] => + IncomingMessage(Some(message.id), message.source) + } + .runWith( + ElasticsearchSink( + "sink1", + "book", + ElasticsearchSinkSettings(5) + ) + ) + //#run-jsobject + + Await.result(f1, Duration.Inf) + + flush("sink1") + + // Assert docs in sink1/book + val f2 = ElasticsearchSource( + "sink1", + "book", + """{"match_all": {}}""", + ElasticsearchSourceSettings() + ).map { message => + message.source.fields("title").asInstanceOf[JsString].value + } + .runWith(Sink.seq) + + val result = Await.result(f2, Duration.Inf) + + result.sorted shouldEqual Seq( + "Akka Concurrency", + "Akka in Action", + "Effective Akka", + "Learning Scala", + "Programming in Scala", + "Scala Puzzlers", + "Scala for Spark in Production" + ) + } + } + + "Typed Elasticsearch connector" should { + "consume and publish documents as specific type" in { + // Copy source/book to sink2/book through typed stream + //#run-typed + val f1 = ElasticsearchSource + .typed[Book]( + "source", + "book", + """{"match_all": {}}""", + ElasticsearchSourceSettings(5) + ) + .map { message: OutgoingMessage[Book] => + IncomingMessage(Some(message.id), message.source) + } + .runWith( + ElasticsearchSink.typed[Book]( + "sink2", + "book", + ElasticsearchSinkSettings(5) + ) + ) + //#run-typed + + Await.result(f1, Duration.Inf) + + flush("sink2") + + // Assert docs in sink2/book + val f2 = ElasticsearchSource + .typed[Book]( + "sink2", + "book", + """{"match_all": {}}""", + ElasticsearchSourceSettings() + ) + .map { message => + message.source.title + } + .runWith(Sink.seq) + + val result = Await.result(f2, Duration.Inf) + + result.sorted shouldEqual Seq( + "Akka Concurrency", + "Akka in Action", + "Effective Akka", + "Learning Scala", + "Programming in Scala", + "Scala Puzzlers", + "Scala for Spark in Production" + ) + } + } + + "ElasticsearchFlow" should { + "store documents and pass Responses" in { + // Copy source/book to sink3/book through typed stream + //#run-flow + val f1 = ElasticsearchSource + .typed[Book]( + "source", + "book", + """{"match_all": {}}""", + ElasticsearchSourceSettings(5) + ) + .map { message: OutgoingMessage[Book] => + IncomingMessage(Some(message.id), message.source) + } + .via( + ElasticsearchFlow.typed[Book]( + "sink3", + "book", + ElasticsearchSinkSettings(5) + ) + ) + .runWith(Sink.seq) + //#run-flow + + val result1 = Await.result(f1, Duration.Inf) + flush("sink3") + + // Assert no errors + assert(result1.forall(_.isEmpty)) + + // Assert docs in sink3/book + val f2 = ElasticsearchSource + .typed[Book]( + "sink3", + "book", + """{"match_all": {}}""", + ElasticsearchSourceSettings() + ) + .map { message => + message.source.title + } + .runWith(Sink.seq) + + val result2 = Await.result(f2, Duration.Inf) + + result2.sorted shouldEqual Seq( + "Akka Concurrency", + "Akka in Action", + "Effective Akka", + "Learning Scala", + "Programming in Scala", + "Scala Puzzlers", + "Scala for Spark in Production" + ) + } + } + +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 867eb54baa..f056bdb407 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -52,6 +52,15 @@ object Dependencies { ) ) + val Elasticsearch = Seq( + libraryDependencies ++= Seq( + "org.elasticsearch.client" % "rest" % "5.5.3", // ApacheV2 + "io.spray" %% "spray-json" % "1.3.3", // ApacheV2 + "com.fasterxml.jackson.core" % "jackson-databind" % "2.9.1", // ApacheV2 + "org.codelibs" % "elasticsearch-cluster-runner" % "5.6.0.0" % Test // ApacheV2 + ) + ) + val File = Seq( libraryDependencies ++= Seq( "com.google.jimfs" % "jimfs" % "1.1" % Test // ApacheV2