Skip to content

Commit

Permalink
#30 Add method to query ElasticSearch cluster health
Browse files Browse the repository at this point in the history
  • Loading branch information
To-om committed Nov 20, 2017
1 parent b2896df commit d49818e
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 19 deletions.
6 changes: 5 additions & 1 deletion app/org/elastic4play/database/DBConfiguration.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ import play.api.{ Configuration, Logger }
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.{ Sink, Source }
import com.sksamuel.elastic4s.ElasticDsl.{ ClearScrollDefinitionExecutable, CreateIndexDefinitionExecutable, DeleteByIdDefinitionExecutable, GetDefinitionExecutable, IndexDefinitionExecutable, IndexExistsDefinitionExecutable, ScrollExecutable, SearchDefinitionExecutable, UpdateDefinitionExecutable }
import com.sksamuel.elastic4s.ElasticDsl._
import com.sksamuel.elastic4s.admin.IndexExistsDefinition
import com.sksamuel.elastic4s.bulk.RichBulkItemResponse
import com.sksamuel.elastic4s.cluster.ClusterHealthDefinition
import com.sksamuel.elastic4s.delete.DeleteByIdDefinition
import com.sksamuel.elastic4s.get.{ GetDefinition, RichGetResponse }
import com.sksamuel.elastic4s.index.RichIndexResponse
Expand All @@ -23,6 +24,7 @@ import com.sksamuel.elastic4s.streams.ReactiveElastic.ReactiveElastic
import com.sksamuel.elastic4s.streams.{ RequestBuilder, ResponseListener }
import com.sksamuel.elastic4s.update.{ RichUpdateResponse, UpdateDefinition }
import com.sksamuel.elastic4s.{ ElasticsearchClientUri, TcpClient }
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse
import org.elasticsearch.action.delete.DeleteResponse
Expand Down Expand Up @@ -90,6 +92,8 @@ class DBConfiguration(
def execute(getDefinition: GetDefinition): Future[RichGetResponse] = client.execute(getDefinition)
@Timed("database.clear_scroll")
def execute(clearScrollDefinition: ClearScrollDefinition): Future[ClearScrollResult] = client.execute(clearScrollDefinition)
@Timed("database.cluster_health")
def execute(clusterHealthDefinition: ClusterHealthDefinition): Future[ClusterHealthResponse] = client.execute(clusterHealthDefinition)

/**
* Creates a Source (akka stream) from the result of the search
Expand Down
77 changes: 59 additions & 18 deletions app/org/elastic4play/database/DBIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import scala.concurrent.{ ExecutionContext, Future, blocking }

import play.api.{ Configuration, Logger }

import com.sksamuel.elastic4s.ElasticDsl.{ RichFuture, index, mapping, search }
import com.sksamuel.elastic4s.ElasticDsl.{ RichFuture, clusterHealth, index, mapping, search }
import com.sksamuel.elastic4s.indexes.CreateIndexDefinition

import org.elastic4play.models.{ ChildModelDef, ModelAttributes, ModelDef }
Expand All @@ -28,8 +28,10 @@ class DBIndex(
ec)

private[DBIndex] lazy val logger = Logger(getClass)

/**
* Create a new index. Collect mapping for all attributes of all entities
*
* @param models list of all ModelAttributes to used in order to build index mapping
* @return a future which is completed when index creation is finished
*/
Expand All @@ -49,12 +51,13 @@ class DBIndex(
.numericDetection(false)
}
.toSeq
db.execute {
CreateIndexDefinition(db.indexName)
.mappings(modelsMapping)
.shards(nbShards)
.replicas(nbReplicas)
}
db
.execute {
CreateIndexDefinition(db.indexName)
.mappings(modelsMapping)
.shards(nbShards)
.replicas(nbReplicas)
}
.map { _ ⇒ () }
}

Expand All @@ -64,11 +67,13 @@ class DBIndex(
* @return future of true if the index exists
*/
def getIndexStatus: Future[Boolean] = {
db.execute {
index exists db.indexName
} map { indicesExistsResponse ⇒
indicesExistsResponse.isExists
}
db
.execute {
index.exists(db.indexName)
}
.map {
_.isExists
}
}

/**
Expand All @@ -86,11 +91,47 @@ class DBIndex(
* @param modelName name of the document type from which the count must be done
* @return document count
*/
def getSize(modelName: String): Future[Long] = db.execute {
search(db.indexName → modelName).matchAllQuery().size(0)
} map { searchResponse ⇒
searchResponse.totalHits
} recover {
case _ ⇒ 0L
def getSize(modelName: String): Future[Long] =
db
.execute {
search(db.indexName → modelName).matchAllQuery().size(0)
}
.map {
_.totalHits
}
.recover { case _ ⇒ 0L }

/**
* Get cluster status:
* 0: green
* 1: yellow
* 2: red
*
* @return cluster status
*/
def getClusterStatus: Future[Int] = {
db
.execute {
clusterHealth(db.indexName)
}
.map {
_.getStatus.value().toInt
}
.recover { case _ ⇒ 2 }
}

def clusterStatus = blocking {
getClusterStatus.await
}

def getClusterStatusName: Future[String] = getClusterStatus.map {
case 0 ⇒ "green"
case 1 ⇒ "yellow"
case 2 ⇒ "red"
case _ ⇒ "unknown"
}

def clusterStatusName = blocking {
getClusterStatusName.await
}
}

0 comments on commit d49818e

Please sign in to comment.