Skip to content

Commit

Permalink
#34 Replace ElasticSearch TCP client by HTTP client
Browse files Browse the repository at this point in the history
  • Loading branch information
To-om committed Jan 26, 2018
1 parent 06d45ec commit 5e2e12b
Show file tree
Hide file tree
Showing 36 changed files with 393 additions and 319 deletions.
16 changes: 6 additions & 10 deletions app/org/elastic4play/ErrorHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,9 @@ import scala.concurrent.Future

import play.api.Logger
import play.api.http.{ HttpErrorHandler, Status, Writeable }
import play.api.libs.json.{ JsNull, JsValue, Json }
import play.api.libs.json.{ JsValue, Json }
import play.api.mvc.{ RequestHeader, ResponseHeader, Result, Results }

import org.elasticsearch.client.transport.NoNodeAvailableException
import org.elasticsearch.index.IndexNotFoundException
import org.elasticsearch.index.query.QueryShardException

import org.elastic4play.JsonFormat.attributeCheckingExceptionWrites

/**
Expand All @@ -33,10 +29,10 @@ class ErrorHandler extends HttpErrorHandler {
case nfe: NumberFormatException Some(Status.BAD_REQUEST Json.obj("type" "NumberFormatException", "message" ("Invalid format " + nfe.getMessage)))
case NotFoundError(message) Some(Status.NOT_FOUND Json.obj("type" "NotFoundError", "message" message))
case BadRequestError(message) Some(Status.BAD_REQUEST Json.obj("type" "BadRequest", "message" message))
case SearchError(message, cause) Some(Status.BAD_REQUEST Json.obj("type" "SearchError", "message" s"$message (${cause.getMessage})"))
case SearchError(message, cause, _) Some(Status.BAD_REQUEST Json.obj("type" "SearchError", "message" s"$message (${cause.getMessage})"))
case ace: AttributeCheckingError Some(Status.BAD_REQUEST Json.toJson(ace))
case iae: IllegalArgumentException Some(Status.BAD_REQUEST Json.obj("type" "IllegalArgument", "message" iae.getMessage))
case _: NoNodeAvailableException Some(Status.INTERNAL_SERVER_ERROR Json.obj("type" "NoNodeAvailable", "message" "ElasticSearch cluster is unreachable"))
// FIXME case _: NoNodeAvailableException ⇒ Some(Status.INTERNAL_SERVER_ERROR → Json.obj("type" → "NoNodeAvailable", "message" → "ElasticSearch cluster is unreachable"))
case CreateError(_, message, attributes) Some(Status.INTERNAL_SERVER_ERROR Json.obj("type" "CreateError", "message" message, "object" attributes))
case ErrorWithObject(tpe, message, obj) Some(Status.BAD_REQUEST Json.obj("type" tpe, "message" message, "object" obj))
case GetError(message) Some(Status.INTERNAL_SERVER_ERROR Json.obj("type" "GetError", "message" message))
Expand All @@ -45,9 +41,9 @@ class ErrorHandler extends HttpErrorHandler {
case Some((_, j)) j
}
Some(Status.MULTI_STATUS Json.obj("type" "MultiError", "error" message, "suberrors" suberrors))
case _: IndexNotFoundException Some(520 JsNull)
case qse: QueryShardException Some(Status.BAD_REQUEST Json.obj("type" "Invalid search query", "message" qse.getMessage))
case t: Throwable Option(t.getCause).flatMap(toErrorResult)
// FIXME case _: IndexNotFoundException ⇒ Some(520 → JsNull)
// FIXME case qse: QueryShardException ⇒ Some(Status.BAD_REQUEST → Json.obj("type" → "Invalid search query", "message" → qse.getMessage))
case t: Throwable Option(t.getCause).flatMap(toErrorResult)
}
}

Expand Down
9 changes: 8 additions & 1 deletion app/org/elastic4play/Errors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package org.elastic4play

import play.api.libs.json.{ JsObject, JsValue }

import com.sksamuel.elastic4s.http.ElasticError

import org.elastic4play.controllers.InputValue

class ErrorWithObject(message: String, val obj: JsObject) extends Exception(message)
Expand All @@ -16,7 +18,12 @@ case class NotFoundError(message: String) extends Exception(message)
case class GetError(message: String) extends Exception(message)
case class UpdateError(status: Option[String], message: String, attributes: JsObject) extends ErrorWithObject(message, attributes)
case class InternalError(message: String) extends Exception(message)
case class SearchError(message: String, cause: Throwable) extends Exception(message, cause)
case class SearchError(message: String, cause: Throwable, elasticError: ElasticError) extends Exception(message, cause)
object SearchError {
def apply(error: ElasticError): SearchError = SearchError(s"${error.`type`} ${error.reason}", null, error)
def apply(message: String): SearchError = SearchError(message, null, null)
def apply(message: String, cause: Throwable): SearchError = SearchError(message, cause, null)
}
case class AuthenticationError(message: String) extends Exception(message)
case class OAuth2Redirect(redirectUrl: String, params: Map[String, Seq[String]]) extends Exception(redirectUrl)
case class AuthorizationError(message: String) extends Exception(message)
Expand Down
140 changes: 89 additions & 51 deletions app/org/elastic4play/database/DBConfiguration.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,37 @@ package org.elastic4play.database

import javax.inject.{ Inject, Named, Singleton }

import scala.concurrent.duration.DurationInt
import scala.concurrent.{ ExecutionContext, Future, Promise }
import scala.concurrent.duration._

import play.api.inject.ApplicationLifecycle
import play.api.{ Configuration, Logger }

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.{ Sink, Source }
import com.sksamuel.elastic4s.ElasticDsl._
import com.sksamuel.elastic4s.admin.IndexExistsDefinition
import com.sksamuel.elastic4s.bulk.RichBulkItemResponse
import com.sksamuel.elastic4s.ElasticsearchClientUri
import com.sksamuel.elastic4s.admin.IndicesExists
import com.sksamuel.elastic4s.cluster.ClusterHealthDefinition
import com.sksamuel.elastic4s.delete.DeleteByIdDefinition
import com.sksamuel.elastic4s.get.{ GetDefinition, RichGetResponse }
import com.sksamuel.elastic4s.index.RichIndexResponse
import com.sksamuel.elastic4s.get.GetDefinition
import com.sksamuel.elastic4s.http.ElasticDsl._
import com.sksamuel.elastic4s.http.{ HttpClient, RequestFailure, RequestSuccess }
import com.sksamuel.elastic4s.http.bulk.BulkResponseItem
import com.sksamuel.elastic4s.http.cluster.ClusterHealthResponse
import com.sksamuel.elastic4s.http.delete.DeleteResponse
import com.sksamuel.elastic4s.http.get.GetResponse
import com.sksamuel.elastic4s.http.index.admin.IndexExistsResponse
import com.sksamuel.elastic4s.http.index.{ CreateIndexResponse, IndexResponse }
import com.sksamuel.elastic4s.http.search.{ ClearScrollResponse, SearchHit, SearchResponse }
import com.sksamuel.elastic4s.http.update.UpdateResponse
import com.sksamuel.elastic4s.indexes.{ CreateIndexDefinition, IndexDefinition }
import com.sksamuel.elastic4s.searches._
import com.sksamuel.elastic4s.streams.ReactiveElastic.ReactiveElastic
import com.sksamuel.elastic4s.streams.{ RequestBuilder, ResponseListener }
import com.sksamuel.elastic4s.update.{ RichUpdateResponse, UpdateDefinition }
import com.sksamuel.elastic4s.{ ElasticsearchClientUri, TcpClient }
import com.sksamuel.elastic4s.xpack.security.XPackElasticClient
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse
import org.elasticsearch.action.delete.DeleteResponse
import org.elasticsearch.common.settings.Settings
import com.sksamuel.elastic4s.update.UpdateDefinition
import com.sksamuel.elastic4s.streams.ReactiveElastic._

import org.elastic4play.Timed
import org.elastic4play.{ SearchError, Timed }

/**
* This class is a wrapper of ElasticSearch client from Elastic4s
Expand All @@ -43,8 +44,8 @@ class DBConfiguration(
searchHost: Seq[String],
searchCluster: String,
baseIndexName: String,
xpackUsername: Option[String],
xpackPassword: Option[String],
xpackUsername: Option[String], // FIXME
xpackPassword: Option[String], //FIXME
lifecycle: ApplicationLifecycle,
val version: Int,
implicit val ec: ExecutionContext,
Expand All @@ -70,66 +71,103 @@ class DBConfiguration(

private[DBConfiguration] lazy val logger = Logger(getClass)

private def connect(): TcpClient = {
val uri = ElasticsearchClientUri(s"elasticsearch://${searchHost.mkString(",")}")
val settings = Settings.builder()
settings.put("cluster.name", searchCluster)

val xpackClient = for {
username xpackUsername
if username.nonEmpty
password xpackPassword
if password.nonEmpty
_ = settings.put("xpack.security.user", s"$username:$password")
} yield XPackElasticClient(settings.build(), uri)

xpackClient.getOrElse(TcpClient.transport(settings.build(), uri))
}
// FIXME
// private def connect(): TcpClient = {
// val uri = ElasticsearchClientUri(s"elasticsearch://${searchHost.mkString(",")}")
// val settings = Settings.builder()
// settings.put("cluster.name", searchCluster)
//
// val xpackClient = for {
// username ← xpackUsername
// if username.nonEmpty
// password ← xpackPassword
// if password.nonEmpty
// _ = settings.put("xpack.security.user", s"$username:$password")
// } yield XPackElasticClient(settings.build(), uri)
//
// xpackClient.getOrElse(TcpClient.transport(settings.build(), uri))
// }

/**
* Underlying ElasticSearch client
*/
private[database] val client = connect()
private[database] val client = {
val uri = ElasticsearchClientUri("elasticsearch://" + searchHost.mkString(","))
HttpClient(uri.copy(options = uri.options + ("cluster.name" -> searchCluster)))
}
// when application close, close also ElasticSearch connection
lifecycle.addStopHook { () Future { client.close() } }

private def processResponse[R](response: Future[Either[RequestFailure, RequestSuccess[R]]]): Future[R] = {
response.flatMap { response
response.fold[Future[R]](
r Future.failed(SearchError(r.error)),
r Future.successful(r.result))
}
}
@Timed("database.index")
def execute(indexDefinition: IndexDefinition): Future[RichIndexResponse] = client.execute(indexDefinition)
def execute(indexDefinition: IndexDefinition): Future[IndexResponse] = {
processResponse(client.execute(indexDefinition))
}
@Timed("database.search")
def execute(searchDefinition: SearchDefinition): Future[RichSearchResponse] = client.execute(searchDefinition)
def execute(searchDefinition: SearchDefinition): Future[SearchResponse] = {
processResponse(client.execute(searchDefinition))
}
@Timed("database.create")
def execute(createIndexDefinition: CreateIndexDefinition): Future[CreateIndexResponse] = client.execute(createIndexDefinition)
def execute(createIndexDefinition: CreateIndexDefinition): Future[CreateIndexResponse] = {
processResponse(client.execute(createIndexDefinition))
}
@Timed("database.update")
def execute(updateDefinition: UpdateDefinition): Future[RichUpdateResponse] = client.execute(updateDefinition)
def execute(updateDefinition: UpdateDefinition): Future[UpdateResponse] = {
processResponse(client.execute(updateDefinition))
}
@Timed("database.search_scroll")
def execute(searchScrollDefinition: SearchScrollDefinition): Future[RichSearchResponse] = client.execute(searchScrollDefinition)
def execute(searchScrollDefinition: SearchScrollDefinition): Future[SearchResponse] = {
processResponse(client.execute(searchScrollDefinition))
}
@Timed("database.index_exists")
def execute(indexExistsDefinition: IndexExistsDefinition): Future[IndicesExistsResponse] = client.execute(indexExistsDefinition)
def execute(indicesExists: IndicesExists): Future[IndexExistsResponse] = {
processResponse(client.execute(indicesExists))
}
@Timed("database.delete")
def execute(deleteByIdDefinition: DeleteByIdDefinition): Future[DeleteResponse] = client.execute(deleteByIdDefinition)
def execute(deleteByIdDefinition: DeleteByIdDefinition): Future[DeleteResponse] = {
processResponse(client.execute(deleteByIdDefinition))
}
@Timed("database.get")
def execute(getDefinition: GetDefinition): Future[RichGetResponse] = client.execute(getDefinition)
def execute(getDefinition: GetDefinition): Future[Either[RequestFailure, RequestSuccess[GetResponse]]] = client.execute(getDefinition)
@Timed("database.clear_scroll")
def execute(clearScrollDefinition: ClearScrollDefinition): Future[ClearScrollResult] = client.execute(clearScrollDefinition)
def execute(clearScrollDefinition: ClearScrollDefinition): Future[Either[RequestFailure, RequestSuccess[ClearScrollResponse]]] = client.execute(clearScrollDefinition)
@Timed("database.cluster_health")
def execute(clusterHealthDefinition: ClusterHealthDefinition): Future[ClusterHealthResponse] = client.execute(clusterHealthDefinition)
def execute(clusterHealthDefinition: ClusterHealthDefinition): Future[ClusterHealthResponse] = {
processResponse(client.execute(clusterHealthDefinition))
}

/**
* Creates a Source (akka stream) from the result of the search
*/
def source(searchDefinition: SearchDefinition): Source[RichSearchHit, NotUsed] = Source.fromPublisher(client.publisher(searchDefinition))
def source(searchDefinition: SearchDefinition): Source[SearchHit, NotUsed] = Source.fromPublisher(client.publisher(searchDefinition))

private lazy val sinkListener = new ResponseListener {
override def onAck(resp: RichBulkItemResponse): Unit = ()
override def onFailure(resp: RichBulkItemResponse): Unit = {
logger.warn(s"Document index failure ${resp.id}: ${resp.failureMessage}")
}
}
// private lazy val sinkListener = new ResponseListener[T] {
// override def onAck(resp: BulkResponseItem, original: T): Unit
// def onFailure(resp: BulkResponseItem, original: T): Unit = ()
// override def onAck(resp: RichBulkItemResponse): Unit = ()
// override def onFailure(resp: RichBulkItemResponse): Unit = {
// logger.warn(s"Document index failure ${resp.id}: ${resp.failureMessage}")
// }
// }

/**
* Create a Sink (akka stream) that create entity in ElasticSearch
*/
def sink[T](implicit builder: RequestBuilder[T]): Sink[T, Future[Unit]] = {
val sinkListener = new ResponseListener[T] {
override def onAck(resp: BulkResponseItem, original: T): Unit = ()
override def onFailure(resp: BulkResponseItem, original: T): Unit = {
val errorMessage = resp.error.fold("unknown")(_.reason)
logger.warn(s"Document index failure ${resp.id}: $errorMessage")
}
}

val end = Promise[Unit]
val complete = () {
if (!end.isCompleted)
Expand Down
36 changes: 20 additions & 16 deletions app/org/elastic4play/database/DBCreate.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,14 @@ import play.api.libs.json.JsValue.jsValueToJsLookup
import play.api.libs.json._

import akka.stream.scaladsl.Sink
import com.sksamuel.elastic4s.ElasticDsl.indexInto
import com.sksamuel.elastic4s.RefreshPolicy
import com.sksamuel.elastic4s.http.ElasticDsl.indexInto
import com.sksamuel.elastic4s.http.ElasticError
import com.sksamuel.elastic4s.indexes.IndexDefinition
import com.sksamuel.elastic4s.streams.RequestBuilder
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy
import org.elasticsearch.index.engine.VersionConflictEngineException
import org.elasticsearch.transport.RemoteTransportException

import org.elastic4play.models.BaseEntity
import org.elastic4play.{ ConflictError, CreateError, InternalError }
import org.elastic4play.{ InternalError, SearchError }

/**
* Service lass responsible for entity creation
Expand Down Expand Up @@ -66,22 +65,27 @@ class DBCreate @Inject() (
indexInto(db.indexName, modelName).source(docSource).refresh(RefreshPolicy.WAIT_UNTIL)
}
}
.transform(
indexResponse attributes +
.map { indexResponse
attributes +
("_type" JsString(modelName)) +
("_id" JsString(indexResponse.id)) +
("_parent" parentId.fold[JsValue](JsNull)(JsString)) +
("_routing" JsString(routing.getOrElse(indexResponse.id))) +
("_version" -> JsNumber(indexResponse.version)),
convertError(attributes, _))
("_version" -> JsNumber(indexResponse.version)) +
("_routing" JsString(routing.getOrElse(indexResponse.id)))
}
.recoverWith {
case SearchError(_, _, error) Future.failed(convertError(attributes, error))
}
}

private[database] def convertError(attributes: JsObject, error: Throwable): Throwable = error match {
case rte: RemoteTransportException convertError(attributes, rte.getCause)
case vcee: VersionConflictEngineException ConflictError(vcee.getMessage, attributes)
case other
logger.warn("create error", other)
CreateError(None, other.getMessage, attributes)
private[database] def convertError(attributes: JsObject, error: ElasticError): Throwable = error match {
// case ElasticError(???rte: RemoteTransportException ⇒ convertError(attributes, rte.getCause)
// case vcee: VersionConflictEngineException ⇒ ConflictError(vcee.getMessage, attributes)
// case other ⇒
// logger.warn("create error", other)
// CreateError(None, other.getMessage, attributes)
// FIXME
case _ ???
}

/**
Expand Down
Loading

0 comments on commit 5e2e12b

Please sign in to comment.