-
Notifications
You must be signed in to change notification settings - Fork 645
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
18 changed files
with
1,342 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,130 @@ | ||
# Elasticsearch Connector | ||
|
||
The Elasticsearch connector provides Akka Stream sources and sinks for Elasticsearch. | ||
|
||
For more information about Elasticsearch please visit the [official documentation](https://www.elastic.co/guide/index.html). | ||
|
||
## Artifacts | ||
|
||
sbt | ||
: @@@vars | ||
```scala | ||
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-elasticsearch" % "$version$" | ||
``` | ||
@@@ | ||
|
||
Maven | ||
: @@@vars | ||
```xml | ||
<dependency> | ||
<groupId>com.lightbend.akka</groupId> | ||
<artifactId>akka-stream-alpakka-elasticsearch_$scala.binaryVersion$</artifactId> | ||
<version>$version$</version> | ||
</dependency> | ||
``` | ||
@@@ | ||
|
||
Gradle | ||
: @@@vars | ||
```gradle | ||
dependencies { | ||
compile group: "com.lightbend.akka", name: "akka-stream-alpakka-elasticsearch_$scala.binaryVersion$", version: "$version$" | ||
} | ||
``` | ||
@@@ | ||
|
||
## Usage | ||
|
||
Sources, Flows and Sinks provided by this connector need a prepared `RestClient` to access to Elasticsearch. | ||
|
||
Scala | ||
: @@snip (../../../../elasticsearch/src/test/scala/akka/stream/alpakka/elasticsearch/ElasticsearchSpec.scala) { #init-client } | ||
|
||
Java | ||
: @@snip (../../../../elasticsearch/src/test/java/akka/stream/alpakka/elasticsearch/ElasticsearchTest.java) { #init-client } | ||
|
||
We will also need an @scaladoc[ActorSystem](akka.actor.ActorSystem) and an @scaladoc[ActorMaterializer](akka.stream.ActorMaterializer). | ||
|
||
Scala | ||
: @@snip (../../../../elasticsearch/src/test/scala/akka/stream/alpakka/elasticsearch/ElasticsearchSpec.scala) { #init-mat } | ||
|
||
Java | ||
: @@snip (../../../../elasticsearch/src/test/java/akka/stream/alpakka/elasticsearch/ElasticsearchTest.java) { #init-mat } | ||
|
||
This is all preparation that we are going to need. | ||
|
||
### JsObject message | ||
|
||
Now we can stream messages which contains spray-json's `JsObject` (in Scala) or `java.util.Map<String, Object>` (in Java) | ||
from or to Elasticsearch where we have access to by providing the `RestClient` to the | ||
@scaladoc[ElasticsearchSource](akka.stream.alpakka.elasticsearch.scaladsl.ElasticsearchSource$) or the | ||
@scaladoc[ElasticsearchSink](akka.stream.alpakka.elasticsearch.scaladsl.ElasticsearchSink$). | ||
|
||
Scala | ||
: @@snip (../../../../elasticsearch/src/test/scala/akka/stream/alpakka/elasticsearch/ElasticsearchSpec.scala) { #run-jsobject } | ||
|
||
Java | ||
: @@snip (../../../../elasticsearch/src/test/java/akka/stream/alpakka/elasticsearch/ElasticsearchTest.java) { #run-jsobject } | ||
|
||
|
||
### Typed messages | ||
|
||
Also, it's possible to stream messages which contains any classes. In Scala, spray-json is used for JSON conversion, | ||
so defining the mapped class and `JsonFormat` for it is necessary. In Java, Jackson is used, so just define the mapped class. | ||
|
||
Scala | ||
: @@snip (../../../../elasticsearch/src/test/scala/akka/stream/alpakka/elasticsearch/ElasticsearchSpec.scala) { #define-class } | ||
|
||
Java | ||
: @@snip (../../../../elasticsearch/src/test/java/akka/stream/alpakka/elasticsearch/ElasticsearchTest.java) { #define-class } | ||
|
||
|
||
Use `ElasticsearchSource.typed` and `ElasticsearchSink.typed` to create source and sink instead. | ||
|
||
Scala | ||
: @@snip (../../../../elasticsearch/src/test/scala/akka/stream/alpakka/elasticsearch/ElasticsearchSpec.scala) { #run-typed } | ||
|
||
Java | ||
: @@snip (../../../../elasticsearch/src/test/java/akka/stream/alpakka/elasticsearch/ElasticsearchTest.java) { #run-typed } | ||
|
||
|
||
### Configuration | ||
|
||
We can specify the buffer size for the source. | ||
|
||
Scala (source) | ||
: @@snip (../../../../elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchSourceSettings.scala) { #source-settings } | ||
|
||
Also, we can specify the buffer size, the max retry count and the retry interval for the sink. | ||
|
||
Scala (sink) | ||
: @@snip (../../../../elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchSinkSettings.scala) { #sink-settings } | ||
|
||
`ElasticsearchSource` retrieves messages from Elasticsearch by scroll scan. This buffer size is used as the scroll size. | ||
`ElasticsearchSink` puts messages by one bulk request per messages of this buffer size. | ||
|
||
### Using Elasticsearch as a Flow | ||
|
||
You can also build flow stages. The API is similar to creating Sinks. | ||
|
||
Scala (flow) | ||
: @@snip (../../../../elasticsearch/src/test/scala/akka/stream/alpakka/elasticsearch/ElasticsearchSpec.scala) { #run-flow } | ||
|
||
Java (flow) | ||
: @@snip (../../../../elasticsearch/src/test/java/akka/stream/alpakka/elasticsearch/ElasticsearchTest.java) { #run-flow } | ||
|
||
### Running the example code | ||
|
||
The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt. | ||
|
||
Scala | ||
: ``` | ||
sbt | ||
> elasticsearch/testOnly *.ElasticsearchSpec | ||
``` | ||
|
||
Java | ||
: ``` | ||
sbt | ||
> elasticsearch/testOnly *.ElasticsearchTest | ||
``` |
187 changes: 187 additions & 0 deletions
187
elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/ElasticsearchFlowStage.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,187 @@ | ||
/* | ||
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com> | ||
*/ | ||
package akka.stream.alpakka.elasticsearch | ||
|
||
import akka.stream.{Attributes, FlowShape, Inlet, Outlet} | ||
import akka.stream.stage._ | ||
import org.apache.http.entity.StringEntity | ||
import org.elasticsearch.client.{Response, ResponseListener, RestClient} | ||
|
||
import scala.collection.mutable | ||
import scala.collection.JavaConverters._ | ||
import spray.json._ | ||
|
||
import scala.concurrent.Future | ||
import scala.concurrent.duration._ | ||
import ElasticsearchFlowStage._ | ||
import akka.NotUsed | ||
import akka.stream.alpakka.elasticsearch.scaladsl.ElasticsearchSinkSettings | ||
import org.apache.http.message.BasicHeader | ||
import org.apache.http.util.EntityUtils | ||
|
||
final case class IncomingMessage[T](id: Option[String], source: T) | ||
|
||
trait MessageWriter[T] { | ||
def convert(message: T): String | ||
} | ||
|
||
class ElasticsearchFlowStage[T, R]( | ||
indexName: String, | ||
typeName: String, | ||
client: RestClient, | ||
settings: ElasticsearchSinkSettings, | ||
pusher: Seq[IncomingMessage[T]] => R, | ||
writer: MessageWriter[T] | ||
) extends GraphStage[FlowShape[IncomingMessage[T], Future[R]]] { | ||
|
||
private val in = Inlet[IncomingMessage[T]]("messages") | ||
private val out = Outlet[Future[R]]("failed") | ||
override val shape = FlowShape(in, out) | ||
|
||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = | ||
new TimerGraphStageLogic(shape) with InHandler with OutHandler { | ||
|
||
private var state: State = Idle | ||
private val queue = new mutable.Queue[IncomingMessage[T]]() | ||
private val failureHandler = getAsyncCallback[(Seq[IncomingMessage[T]], Throwable)](handleFailure) | ||
private val responseHandler = getAsyncCallback[(Seq[IncomingMessage[T]], Response)](handleResponse) | ||
private var failedMessages: Seq[IncomingMessage[T]] = Nil | ||
private var retryCount: Int = 0 | ||
|
||
override def preStart(): Unit = | ||
pull(in) | ||
|
||
private def tryPull(): Unit = | ||
if (queue.size < settings.bufferSize && !isClosed(in) && !hasBeenPulled(in)) { | ||
pull(in) | ||
} | ||
|
||
override def onTimer(timerKey: Any): Unit = { | ||
sendBulkUpdateRequest(failedMessages) | ||
failedMessages = Nil | ||
} | ||
|
||
private def handleFailure(args: (Seq[IncomingMessage[T]], Throwable)): Unit = { | ||
val (messages, exception) = args | ||
if (retryCount >= settings.maxRetry) { | ||
failStage(exception) | ||
} else { | ||
retryCount = retryCount + 1 | ||
failedMessages = messages | ||
scheduleOnce(NotUsed, settings.retryInterval.millis) | ||
} | ||
} | ||
|
||
private def handleSuccess(): Unit = | ||
completeStage() | ||
|
||
private def handleResponse(args: (Seq[IncomingMessage[T]], Response)): Unit = { | ||
retryCount = 0 | ||
val (messages, response) = args | ||
val responseJson = EntityUtils.toString(response.getEntity).parseJson | ||
|
||
// If some commands in bulk request failed, pass failed messages to follows. | ||
val items = responseJson.asJsObject.fields("items").asInstanceOf[JsArray] | ||
val failedMessages = items.elements.zip(messages).flatMap { | ||
case (item, message) => | ||
val result = item.asJsObject.fields("index").asJsObject.fields("result").asInstanceOf[JsString].value | ||
if (result == "created" || result == "updated") { | ||
None | ||
} else { | ||
Some(message) | ||
} | ||
} | ||
|
||
// Fetch next messages from queue and send them | ||
val nextMessages = (1 to settings.bufferSize).flatMap { _ => | ||
queue.dequeueFirst(_ => true) | ||
} | ||
|
||
if (nextMessages.isEmpty) { | ||
state match { | ||
case Finished => handleSuccess() | ||
case _ => state = Idle | ||
} | ||
} else { | ||
sendBulkUpdateRequest(nextMessages) | ||
} | ||
|
||
push(out, Future.successful(pusher(failedMessages))) | ||
} | ||
|
||
private def sendBulkUpdateRequest(messages: Seq[IncomingMessage[T]]): Unit = { | ||
val json = messages | ||
.map { message => | ||
JsObject( | ||
"index" -> JsObject( | ||
Seq( | ||
Option("_index" -> JsString(indexName)), | ||
Option("_type" -> JsString(typeName)), | ||
message.id.map { id => | ||
"_id" -> JsString(id) | ||
} | ||
).flatten: _* | ||
) | ||
).toString + "\n" + writer.convert(message.source) | ||
} | ||
.mkString("", "\n", "\n") | ||
|
||
client.performRequestAsync( | ||
"POST", | ||
"/_bulk", | ||
Map[String, String]().asJava, | ||
new StringEntity(json), | ||
new ResponseListener() { | ||
override def onFailure(exception: Exception): Unit = | ||
failureHandler.invoke((messages, exception)) | ||
override def onSuccess(response: Response): Unit = | ||
responseHandler.invoke((messages, response)) | ||
}, | ||
new BasicHeader("Content-Type", "application/x-ndjson") | ||
) | ||
} | ||
|
||
setHandlers(in, out, this) | ||
|
||
override def onPull(): Unit = tryPull() | ||
|
||
override def onPush(): Unit = { | ||
val message = grab(in) | ||
queue.enqueue(message) | ||
|
||
state match { | ||
case Idle => { | ||
state = Sending | ||
val messages = (1 to settings.bufferSize).flatMap { _ => | ||
queue.dequeueFirst(_ => true) | ||
} | ||
sendBulkUpdateRequest(messages) | ||
} | ||
case _ => () | ||
} | ||
|
||
tryPull() | ||
} | ||
|
||
override def onUpstreamFailure(exception: Throwable): Unit = | ||
failStage(exception) | ||
|
||
override def onUpstreamFinish(): Unit = | ||
state match { | ||
case Idle => handleSuccess() | ||
case Sending => state = Finished | ||
case Finished => () | ||
} | ||
} | ||
|
||
} | ||
|
||
object ElasticsearchFlowStage { | ||
|
||
private sealed trait State | ||
private case object Idle extends State | ||
private case object Sending extends State | ||
private case object Finished extends State | ||
|
||
} |
Oops, something went wrong.