Skip to content

Commit

Permalink
Elasticsearch: use proper FlowWithContext (#1766)
Browse files Browse the repository at this point in the history
Elasticsearch: use proper FlowWithContext
  • Loading branch information
2m authored Jun 20, 2019
2 parents 872d76d + b2c1d5b commit c137fcf
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 20 deletions.
3 changes: 3 additions & 0 deletions elasticsearch/src/main/mima-filters/1.0.2.backwards.excludes
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# PR #1766 use proper FlowWithContext
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.stream.alpakka.elasticsearch.javadsl.ElasticsearchFlow.createWithContext")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.stream.alpakka.elasticsearch.scaladsl.ElasticsearchFlow.createWithContext")
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package akka.stream.alpakka.elasticsearch.javadsl

import akka.NotUsed
import akka.annotation.ApiMayChange
import akka.japi.Pair
import akka.stream.alpakka.elasticsearch.{impl, _}
import akka.stream.scaladsl
import com.fasterxml.jackson.databind.ObjectMapper
Expand Down Expand Up @@ -89,12 +88,9 @@ object ElasticsearchFlow {
settings: ElasticsearchWriteSettings,
elasticsearchClient: RestClient,
objectMapper: ObjectMapper
): akka.stream.javadsl.Flow[Pair[WriteMessage[T, NotUsed], C], Pair[WriteResult[T, C], C], NotUsed] =
): akka.stream.javadsl.FlowWithContext[WriteMessage[T, NotUsed], C, WriteResult[T, C], C, NotUsed] =
scaladsl
.Flow[Pair[WriteMessage[T, NotUsed], C]]
.map { pair =>
pair.first.withPassThrough(pair.second)
}
.Flow[WriteMessage[T, C]]
.batch(settings.bufferSize, immutable.Seq(_)) { case (seq, wm) => seq :+ wm }
.via(
new impl.ElasticsearchFlowStage[T, C](indexName,
Expand All @@ -104,9 +100,9 @@ object ElasticsearchFlow {
new JacksonWriter[T](objectMapper))
)
.mapConcat(identity)
.map { wr =>
Pair.create(wr, wr.message.passThrough)
}
.asFlowWithContext[WriteMessage[T, NotUsed], C, C]((res, c) => res.withPassThrough(c))(
p => p.message.passThrough
)
.asJava

private final class JacksonWriter[T](mapper: ObjectMapper) extends MessageWriter[T] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import akka.NotUsed
import akka.annotation.ApiMayChange
import akka.stream.alpakka.elasticsearch._
import akka.stream.alpakka.elasticsearch.impl
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.{Flow, FlowWithContext}
import org.elasticsearch.client.RestClient
import spray.json._

Expand Down Expand Up @@ -120,7 +120,7 @@ object ElasticsearchFlow {
settings: ElasticsearchWriteSettings = ElasticsearchWriteSettings())(
implicit elasticsearchClient: RestClient,
sprayJsonWriter: JsonWriter[T]
): Flow[(WriteMessage[T, NotUsed], C), (WriteResult[T, C], C), NotUsed] =
): FlowWithContext[WriteMessage[T, NotUsed], C, WriteResult[T, C], C, NotUsed] =
createWithContext[T, C](indexName, typeName, settings, new SprayJsonWriter[T]()(sprayJsonWriter))

/**
Expand All @@ -137,18 +137,14 @@ object ElasticsearchFlow {
settings: ElasticsearchWriteSettings,
writer: MessageWriter[T])(
implicit elasticsearchClient: RestClient
): Flow[(WriteMessage[T, NotUsed], C), (WriteResult[T, C], C), NotUsed] = {
): FlowWithContext[WriteMessage[T, NotUsed], C, WriteResult[T, C], C, NotUsed] = {
require(settings.retryLogic == RetryNever,
"`withContext` may not be used with retrying enabled, as it disturbs element order")
Flow[(WriteMessage[T, NotUsed], C)]
.map {
case (wm, pt) =>
wm.withPassThrough(pt)
}
Flow[WriteMessage[T, C]]
.via(createWithPassThrough(indexName, typeName, settings, writer))
.map { wr =>
(wr, wr.message.passThrough)
}
.asFlowWithContext[WriteMessage[T, NotUsed], C, C]((res, c) => res.withPassThrough(c))(
p => p.message.passThrough
)
}

private final class SprayJsonWriter[T](implicit writer: JsonWriter[T]) extends MessageWriter[T] {
Expand Down

0 comments on commit c137fcf

Please sign in to comment.