Skip to content

Commit

Permalink
#21 Add ability to create different document types in stream sink
Browse files Browse the repository at this point in the history
  • Loading branch information
To-om committed Jun 29, 2017
1 parent 2ad9118 commit 7f71b5e
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 25 deletions.
34 changes: 15 additions & 19 deletions app/org/elastic4play/database/DBCreate.scala
Original file line number Diff line number Diff line change
@@ -1,29 +1,24 @@
package org.elastic4play.database

import javax.inject.{ Inject, Singleton }

import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Failure, Success, Try }
import javax.inject.{Inject, Singleton}

import akka.stream.scaladsl.Sink

import play.api.Logger
import play.api.libs.json.{ JsNull, JsObject, JsString, JsValue }
import play.api.libs.json.JsValue.jsValueToJsLookup

import org.elasticsearch.action.index.IndexResponse
import org.elasticsearch.transport.RemoteTransportException

import com.sksamuel.elastic4s.ElasticDsl.{ bulk, index }
import com.sksamuel.elastic4s.ElasticDsl.{bulk, index}
import com.sksamuel.elastic4s.IndexAndTypes.apply
import com.sksamuel.elastic4s.IndexDefinition
import com.sksamuel.elastic4s.source.JsonDocumentSource
import com.sksamuel.elastic4s.streams.RequestBuilder

import org.elastic4play.{ CreateError, Timed }
import org.elastic4play.models.BaseEntity
import org.elastic4play.{ConflictError, CreateError, InternalError}
import org.elasticsearch.action.index.IndexResponse
import org.elasticsearch.index.engine.DocumentAlreadyExistsException
import org.elastic4play.ConflictError
import org.elasticsearch.transport.RemoteTransportException
import play.api.Logger
import play.api.libs.json.JsValue.jsValueToJsLookup
import play.api.libs.json.{JsNull, JsObject, JsString, JsValue}

import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}

/**
* Service lass responsible for entity creation
Expand Down Expand Up @@ -171,20 +166,21 @@ class DBCreate @Inject() (
* Class used to build index definition based on model name and attributes
* This class is used by sink (ElasticSearch reactive stream)
*/
private class AttributeRequestBuilder(modelName: String) extends RequestBuilder[JsObject] {
private class AttributeRequestBuilder() extends RequestBuilder[JsObject] {
override def request(attributes: JsObject): IndexDefinition = {
val docSource = JsonDocumentSource(JsObject(attributes.fields.filterNot(_._1.startsWith("_"))).toString)
val id = (attributes \ "_id").asOpt[String]
val parent = (attributes \ "_parent").asOpt[String]
val routing = (attributes \ "_routing").asOpt[String] orElse parent orElse id
val modelName = (attributes \ "_type").asOpt[String].getOrElse(throw InternalError("The entity doesn't contain _type attribute"))
addId(id).andThen(addParent(parent)).andThen(addRouting(routing)) {
index into db.indexName modelName doc docSource update true
index.into(db.indexName modelName).doc(docSource).update(true)
}
}
}

/**
* build a akka stream sink that create entities
*/
def sink(modelName: String): Sink[JsObject, Future[Unit]] = db.sink(new AttributeRequestBuilder(modelName))
def sink(): Sink[JsObject, Future[Unit]] = db.sink(new AttributeRequestBuilder())
}
13 changes: 7 additions & 6 deletions app/org/elastic4play/services/MigrationSrv.scala
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
package org.elastic4play.services

import javax.inject.{ Inject, Singleton }
import javax.inject.{Inject, Singleton}

import akka.NotUsed
import akka.stream.Materializer
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.scaladsl.{Sink, Source}
import com.sksamuel.elastic4s.ElasticDsl.search
import com.sksamuel.elastic4s.IndexesAndTypes.apply
import org.elastic4play.InternalError
import org.elastic4play.database._
import play.api.Logger
import play.api.libs.json.JsValue.jsValueToJsLookup
import play.api.libs.json._
import play.api.libs.json.Json.toJsFieldJsValueWrapper
import play.api.libs.json._

import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Failure, Success }
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}

case class MigrationEvent(modelName: String, current: Long, total: Long) extends EventMessage
case object EndOfMigrationEvent extends EventMessage
Expand Down Expand Up @@ -118,7 +118,7 @@ class MigrationSrv @Inject() (
eventSrv.publish(MigrationEvent(modelName, current.toLong, total))
entity
}
.runWith(dbcreate.sink(modelName))
.runWith(dbcreate.sink())
r.onComplete { x
println(s"migrateEntity($modelName) has finished : $x")
}
Expand Down Expand Up @@ -164,6 +164,7 @@ class MigrationSrv @Inject() (
}

def isMigrating: Boolean = !migrationProcess.isCompleted
def isReady: Boolean = dbindex.indexStatus && !migrationProcess.isCompleted
}
/* Operation applied to the previous state of the database to get next version */
trait Operation extends ((String Source[JsObject, NotUsed]) (String Source[JsObject, NotUsed]))
Expand Down

0 comments on commit 7f71b5e

Please sign in to comment.