Skip to content

Commit

Permalink
#82 Fix ElasticSearch configuraiton
Browse files Browse the repository at this point in the history
  • Loading branch information
To-om committed May 15, 2019
1 parent 16c337b commit da0dbe2
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 54 deletions.
94 changes: 55 additions & 39 deletions app/org/elastic4play/database/DBConfiguration.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ import com.sksamuel.elastic4s.streams.ReactiveElastic.ReactiveElastic
import com.sksamuel.elastic4s.streams.{RequestBuilder, ResponseListener}
import javax.inject.{Inject, Named, Singleton}
import javax.net.ssl.{KeyManagerFactory, SSLContext, TrustManagerFactory}
import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials}
import org.apache.http.client.CredentialsProvider
import org.apache.http.client.config.RequestConfig
import org.apache.http.impl.client.BasicCredentialsProvider
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
import org.elasticsearch.client.RestClientBuilder.{HttpClientConfigCallback, RequestConfigCallback}

Expand All @@ -43,8 +46,8 @@ class DBConfiguration @Inject()(
) {
private[DBConfiguration] lazy val logger = Logger(getClass)

def requestConfig: RequestConfigCallback = (requestConfigBuilder: RequestConfig.Builder) {
config.getOptional[Boolean]("search.authenticationEnabled").foreach(requestConfigBuilder.setAuthenticationEnabled)
def requestConfigCallback: RequestConfigCallback = (requestConfigBuilder: RequestConfig.Builder) {
requestConfigBuilder.setAuthenticationEnabled(credentialsProviderMaybe.isDefined)
config.getOptional[Boolean]("search.circularRedirectsAllowed").foreach(requestConfigBuilder.setCircularRedirectsAllowed)
config.getOptional[Int]("search.connectionRequestTimeout").foreach(requestConfigBuilder.setConnectionRequestTimeout)
config.getOptional[Int]("search.connectTimeout").foreach(requestConfigBuilder.setConnectTimeout)
Expand All @@ -62,53 +65,66 @@ class DBConfiguration @Inject()(
requestConfigBuilder
}

def httpClientConfig: HttpClientConfigCallback = (httpClientBuilder: HttpAsyncClientBuilder) {
config.getOptional[String]("search.keyStore.path").map { keyStore
val keyStorePath = Paths.get(keyStore)
val keyStoreType = config.getOptional[String]("search.keyStore.type").getOrElse(KeyStore.getDefaultType)
val keyStorePassword = config.getOptional[String]("search.keyStore.password").getOrElse("").toCharArray
val keyInputStream = Files.newInputStream(keyStorePath)
val keyManagers = try {
val keyStore = KeyStore.getInstance(keyStoreType)
keyStore.load(keyInputStream, keyStorePassword)
val kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
kmf.init(keyStore, keyStorePassword)
kmf.getKeyManagers
} finally {
keyInputStream.close()
}
lazy val credentialsProviderMaybe: Option[CredentialsProvider] =
for {
user config.getOptional[String]("search.user")
password config.getOptional[String]("search.password")
} yield {
val provider = new BasicCredentialsProvider
val credentials = new UsernamePasswordCredentials(user, password)
provider.setCredentials(AuthScope.ANY, credentials)
provider
}

lazy val sslContextMaybe: Option[SSLContext] = config.getOptional[String]("search.keyStore.path").map { keyStore
val keyStorePath = Paths.get(keyStore)
val keyStoreType = config.getOptional[String]("search.keyStore.type").getOrElse(KeyStore.getDefaultType)
val keyStorePassword = config.getOptional[String]("search.keyStore.password").getOrElse("").toCharArray
val keyInputStream = Files.newInputStream(keyStorePath)
val keyManagers = try {
val keyStore = KeyStore.getInstance(keyStoreType)
keyStore.load(keyInputStream, keyStorePassword)
val kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
kmf.init(keyStore, keyStorePassword)
kmf.getKeyManagers
} finally {
keyInputStream.close()
}

val trustManagers = config
.getOptional[String]("search.trustStore.path")
.map { trustStorePath
val keyStoreType = config.getOptional[String]("search.keyStore.type").getOrElse(KeyStore.getDefaultType)
val trustStorePassword = config.getOptional[String]("search.trustStore.password").getOrElse("").toCharArray
val trustInputStream = Files.newInputStream(Paths.get(trustStorePath))
try {
val keyStore = KeyStore.getInstance(keyStoreType)
keyStore.load(trustInputStream, trustStorePassword)
val tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
tmf.init(keyStore)
tmf.getTrustManagers
} finally {
trustInputStream.close()
}
val trustManagers = config
.getOptional[String]("search.trustStore.path")
.map { trustStorePath
val keyStoreType = config.getOptional[String]("search.trustStore.type").getOrElse(KeyStore.getDefaultType)
val trustStorePassword = config.getOptional[String]("search.trustStore.password").getOrElse("").toCharArray
val trustInputStream = Files.newInputStream(Paths.get(trustStorePath))
try {
val keyStore = KeyStore.getInstance(keyStoreType)
keyStore.load(trustInputStream, trustStorePassword)
val tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
tmf.init(keyStore)
tmf.getTrustManagers
} finally {
trustInputStream.close()
}
.getOrElse(Array.empty)
}
.getOrElse(Array.empty)

// Configure the SSL context to use TLS
val sslContext = SSLContext.getInstance("TLS")
sslContext.init(keyManagers, trustManagers, null)
// Configure the SSL context to use TLS
val sslContext = SSLContext.getInstance("TLS")
sslContext.init(keyManagers, trustManagers, null)
sslContext
}

httpClientBuilder.setSSLContext(sslContext)
}
def httpClientConfig: HttpClientConfigCallback = (httpClientBuilder: HttpAsyncClientBuilder) {
sslContextMaybe.foreach(httpClientBuilder.setSSLContext)
credentialsProviderMaybe.foreach(httpClientBuilder.setDefaultCredentialsProvider)
httpClientBuilder
}

/**
* Underlying ElasticSearch client
*/
private[database] val client = ElasticClient(ElasticProperties(config.get[String]("search.uri")), requestConfig, httpClientConfig)
private[database] val client = ElasticClient(ElasticProperties(config.get[String]("search.uri")), requestConfigCallback, httpClientConfig)
// when application close, close also ElasticSearch connection
lifecycle.addStopHook { ()
Future {
Expand Down
1 change: 0 additions & 1 deletion app/org/elastic4play/database/DBCreate.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import com.sksamuel.elastic4s.indexes.IndexRequest
import com.sksamuel.elastic4s.streams.RequestBuilder
import javax.inject.{Inject, Singleton}

import org.elastic4play.CreateError
import org.elastic4play.models.BaseEntity

/**
Expand Down
6 changes: 3 additions & 3 deletions app/org/elastic4play/database/DBFind.scala
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class DBFind(pageSize: Int, keepAlive: FiniteDuration, db: DBConfiguration, impl
db.execute(searchRequest)
.recoverWith {
case t if t == IndexNotFoundException Future.failed(t)
case t Future.failed(SearchError("Invalid search query", t))
case t Future.failed(SearchError("Invalid search query"))
}
}
}
Expand Down Expand Up @@ -173,7 +173,7 @@ class SearchWithScroll(db: DBConfiguration, SearchRequest: SearchRequest, keepAl
val callback = getAsyncCallback[Try[SearchResponse]] {
case Success(searchResponse) if searchResponse.isTimedOut
logger.warn("Search timeout")
failStage(SearchError("Request terminated early or timed out", null))
failStage(SearchError("Request terminated early or timed out"))
case Success(searchResponse) if searchResponse.isEmpty
completeStage()
case Success(searchResponse) if skip > 0
Expand All @@ -190,7 +190,7 @@ class SearchWithScroll(db: DBConfiguration, SearchRequest: SearchRequest, keepAl
pushNextHit()
case Failure(error)
logger.warn("Search error", error)
failStage(SearchError("Request terminated early or timed out", error))
failStage(SearchError("Request terminated early or timed out"))
}
val futureSearchResponse = scrollId.flatMap(s db.execute(searchScroll(s).keepAlive(keepAliveStr)))
scrollId = futureSearchResponse.map(_.scrollId.get)
Expand Down
37 changes: 26 additions & 11 deletions app/org/elastic4play/services/MigrationSrv.scala
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
package org.elastic4play.services

import javax.inject.{Inject, Singleton}

import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}

import play.api.{Configuration, Logger}
import play.api.libs.json.JsValue.jsValueToJsLookup
import play.api.libs.json.Json.toJsFieldJsValueWrapper
import play.api.libs.json._
import play.api.{Configuration, Logger}

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Materializer}
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Materializer}
import com.sksamuel.elastic4s.http.ElasticDsl._
import com.typesafe.config.Config
import javax.inject.{Inject, Singleton}

import org.elastic4play.InternalError
import org.elastic4play.database._

case class MigrationEvent(modelName: String, current: Long, total: Long) extends EventMessage
case object EndOfMigrationEvent extends EventMessage

case object EndOfMigrationEvent extends EventMessage

object IndexType extends Enumeration {
val indexWithMappingTypes, indexWithoutMappingTypes = Value
Expand Down Expand Up @@ -115,6 +115,7 @@ class MigrationSrv @Inject()(
("_version" JsNumber(hit.version))
}
}

override def count(tableName: String): Future[Long] =
db.execute {
search(db.indexName / tableName).matchAllQuery().size(0)
Expand All @@ -123,6 +124,7 @@ class MigrationSrv @Inject()(
_.totalHits
}
.recover { case _ 0L }

override def getEntity(tableName: String, entityId: String): Future[JsObject] = currentdbget(tableName, entityId)
}

Expand Down Expand Up @@ -170,7 +172,10 @@ class MigrationSrv @Inject()(
val r = entities
.zipWith(count) { (entity, current)
eventSrv.publish(MigrationEvent(modelName, current.toLong, total))
entity
(entity \ "_type").asOpt[JsString].fold(entity) { t
val relations = (entity \ "_parent").asOpt[JsString].fold[JsValue](t)(p Json.obj("name" t, "parent" p))
entity - "_type" - "_parent" + ("relations" relations)
}
}
.runWith(dbcreate.sink())
r.onComplete { x
Expand Down Expand Up @@ -230,6 +235,7 @@ class MigrationSrv @Inject()(
def isMigrating: Boolean = !migrationProcess.isCompleted
def isReady: Boolean = dbindex.indexStatus && migrationProcess.isCompleted
}

/* Operation applied to the previous state of the database to get next version */
trait Operation extends ((String Source[JsObject, NotUsed]) (String Source[JsObject, NotUsed]))

Expand Down Expand Up @@ -259,7 +265,8 @@ object Operation {
})

def mapEntity(tables: String*)(transform: JsObject JsObject): Operation = mapEntity(tables.contains, transform)
def apply(table: String)(transform: JsObject JsObject): Operation = mapEntity(_ == table, transform)

def apply(table: String)(transform: JsObject JsObject): Operation = mapEntity(_ == table, transform)

def removeEntity(tableFilter: String Boolean, filter: JsObject Boolean): Operation =
Operation((f: String Source[JsObject, NotUsed]) {
Expand All @@ -268,7 +275,8 @@ object Operation {
})

def removeEntity(tables: String*)(filter: JsObject Boolean): Operation = removeEntity(tables.contains, filter)
def removeEntity(table: String)(filter: JsObject Boolean): Operation = removeEntity(_ == table, filter)

def removeEntity(table: String)(filter: JsObject Boolean): Operation = removeEntity(_ == table, filter)

def renameAttribute(tableFilter: String Boolean, newName: String, oldNamePath: Seq[String]): Operation =
Operation((f: String Source[JsObject, NotUsed]) {
Expand All @@ -283,6 +291,7 @@ object Operation {

def renameAttribute(tables: Seq[String], newName: String, oldNamePath: String*): Operation =
renameAttribute(a tables.contains(a), newName, oldNamePath)

def renameAttribute(table: String, newName: String, oldNamePath: String*): Operation = renameAttribute(_ == table, newName, oldNamePath)

def rename(value: JsObject, newName: String, path: Seq[String]): JsObject =
Expand Down Expand Up @@ -310,6 +319,7 @@ object Operation {

def mapAttribute(tables: Seq[String], attribute: String)(transform: JsValue JsValue): Operation =
mapAttribute(a tables.contains(a), attribute, transform)

def mapAttribute(table: String, attribute: String)(transform: JsValue JsValue): Operation = mapAttribute(_ == table, attribute, transform)

def removeAttribute(tableFilter: String Boolean, attributes: String*): Operation =
Expand All @@ -320,8 +330,10 @@ object Operation {
y - a
}
)

def removeAttribute(tables: Seq[String], attributes: String*): Operation = removeAttribute(a tables.contains(a), attributes: _*)
def removeAttribute(table: String, attributes: String*): Operation = removeAttribute(_ == table, attributes: _*)

def removeAttribute(table: String, attributes: String*): Operation = removeAttribute(_ == table, attributes: _*)

def addAttribute(tableFilter: String Boolean, attributes: (String, JsValue)*): Operation =
mapEntity(
Expand All @@ -331,8 +343,10 @@ object Operation {
y + a
}
)

def addAttribute(tables: Seq[String], attributes: (String, JsValue)*): Operation = addAttribute(t tables.contains(t), attributes: _*)
def addAttribute(table: String, attributes: (String, JsValue)*): Operation = addAttribute(_ == table, attributes: _*)

def addAttribute(table: String, attributes: (String, JsValue)*): Operation = addAttribute(_ == table, attributes: _*)

def addAttributeIfAbsent(tableFilter: String Boolean, attributes: (String, JsValue)*): Operation =
mapEntity(tableFilter, { x
Expand All @@ -345,5 +359,6 @@ object Operation {
})

def addAttributeIfAbsent(tables: Seq[String], attributes: (String, JsValue)*): Operation = addAttribute(t tables.contains(t), attributes: _*)
def addAttributeIfAbsent(table: String, attributes: (String, JsValue)*): Operation = addAttribute(_ == table, attributes: _*)

def addAttributeIfAbsent(table: String, attributes: (String, JsValue)*): Operation = addAttribute(_ == table, attributes: _*)
}

0 comments on commit da0dbe2

Please sign in to comment.