Skip to content

Commit

Permalink
Implement changes in Filter aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
vanjaftn committed Nov 7, 2023
1 parent 1c03119 commit 0627b78
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 52 deletions.
29 changes: 29 additions & 0 deletions docs/overview/aggregations/elastic_aggregation_filter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
---
id: elastic_aggregation_filter
title: "Filter Aggregation"
---

The `Filter` aggregation is a single bucket aggregation that narrows down the entire set of documents to a specific set that matches a [query](https://lambdaworks.github.io/zio-elasticsearch/overview/elastic_query).

In order to use the `Filter` aggregation import the following:
```scala
import zio.elasticsearch.aggregation.FilterAggregation
import zio.elasticsearch.ElasticAggregation.filterAggregation
```

You can create a `Filter` aggregation using the `filterAggregation` method in the following manner:
```scala
val aggregation: FilterAggregation = filterAggregation(name = "filterAggregation", query = term(field = Document.stringField, value = "test"))
```

If you want to add aggregation (on the same level), you can use `withAgg` method:
```scala
val multipleAggregations: MultipleAggregations = filterAggregation(name = "filterAggregation", query = term(field = Document.stringField, value = "test")).withAgg(maxAggregation(name = "maxAggregation", field = Document.doubleField))
```

If you want to add another sub-aggregation, you can use `withSubAgg` method:
```scala
val aggregationWithSubAgg: FilterAggregation = filterAggregation(name = "filterAggregation", query = term(field = Document.stringField, value = "test")).withSubAgg(maxAggregation(name = "maxAggregation", field = Document.intField))
```

You can find more information about `Filter` aggregation [here](https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-filter-aggregation.html).
Original file line number Diff line number Diff line change
Expand Up @@ -104,35 +104,53 @@ object HttpExecutorSpec extends IntegrationSpec {
Executor.execute(ElasticRequest.createIndex(firstSearchIndex)),
Executor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
),
test("aggregate using filter aggregation ttt") {
val expectedResponse = ("aggregation", FilterAggregationResult(
docCount = 1,
subAggregations = Map(
"subAggregation" -> MaxAggregationResult(value = 5.0)
test("aggregate using filter aggregation") {
val expectedResponse = (
"aggregation",
FilterAggregationResult(
docCount = 2,
subAggregations = Map(
"subAggregation" -> MaxAggregationResult(value = 5.0)
)
)
))
checkOnce(genDocumentId, genTestDocument, genDocumentId, genTestDocument) {
(firstDocumentId, firstDocument, secondDocumentId, secondDocument) =>
)
checkOnce(genDocumentId, genTestDocument, genDocumentId, genTestDocument, genDocumentId, genTestDocument) {
(firstDocumentId, firstDocument, secondDocumentId, secondDocument, thirdDocumentId, thirdDocument) =>
for {
_ <- Executor.execute(ElasticRequest.deleteByQuery(firstSearchIndex, matchAll))
firstDocumentUpdated = firstDocument.copy(stringField = "test", intField = 7)
secondDocumentUpdated =
secondDocument.copy(stringField = "filterAggregation", intField = 3)
thirdDocumentUpdated =
thirdDocument.copy(stringField = "filterAggregation", intField = 5)
_ <- Executor.execute(
ElasticRequest.upsert[TestDocument](
firstSearchIndex,
firstDocumentId,
firstDocument.copy(stringField = "test", intField = 5)
firstDocumentUpdated
)
)
_ <- Executor.execute(
ElasticRequest
.upsert[TestDocument](
firstSearchIndex,
secondDocumentId,
secondDocument.copy(stringField = "test1", intField = 7)
).refreshTrue
secondDocumentUpdated
)
)
_ <- Executor.execute(
ElasticRequest
.upsert[TestDocument](
firstSearchIndex,
thirdDocumentId,
thirdDocumentUpdated
)
.refreshTrue
)
query = term(TestDocument.stringField, "test")
query = term(field = TestDocument.stringField, value = secondDocumentUpdated.stringField.toLowerCase)

aggregation =
filterAggregation(name = "aggregation", query).withSubAgg(
filterAggregation(name = "aggregation", query = query).withSubAgg(
maxAggregation("subAggregation", TestDocument.intField)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,19 @@ object ElasticAggregation {
final def cardinalityAggregation(name: String, field: String): CardinalityAggregation =
Cardinality(name = name, field = field, missing = None)

// Scala doc
final def filterAggregation(name: String, query: ElasticQuery[_]): FilterAggregation = {
/**
* Constructs an instance of [[zio.elasticsearch.aggregation.FilterAggregation]] using the specified parameters.
*
* @param name
* aggregation name
* @param query
* a query which the documents must match
* @return
* an instance of [[zio.elasticsearch.aggregation.FilterAggregation]] that represents filter aggregation to be
* performed.
*/
final def filterAggregation(name: String, query: ElasticQuery[_]): FilterAggregation =
Filter(name = name, query = query, subAggregations = Chunk.empty)
}
// // Scala doc
// final def filterAggregation(name: String, query: ElasticQuery[_]): FilterAggregation =
// Filter(name = name, query = query, subAggregations = Chunk.empty)

/**
* Constructs a type-safe instance of [[zio.elasticsearch.aggregation.MaxAggregation]] using the specified parameters.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import zio.Chunk
import zio.elasticsearch.ElasticAggregation.multipleAggregations
import zio.elasticsearch.ElasticPrimitive.ElasticPrimitiveOps
import zio.elasticsearch.aggregation.options._
import zio.elasticsearch.query.{ElasticQuery, TermQuery}
import zio.elasticsearch.query.ElasticQuery
import zio.elasticsearch.query.sort.Sort
import zio.elasticsearch.script.Script
import zio.json.ast.Json
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,16 @@ object AggregationResponse {
AvgAggregationResult(value)
case CardinalityAggregationResponse(value) =>
CardinalityAggregationResult(value)
case FilterAggregationResponse(docCount, subAggregations) =>
case FilterAggregationResponse(docCount, subAggregations) => {
FilterAggregationResult(
docCount = docCount,
subAggregations = subAggregations.fold(Map[String, AggregationResult]())(_.map { case (key, response) =>
(key, toResult(response))
})
subAggregations = subAggregations.fold(Map[String, AggregationResult]())(a =>
a.map { case (key, response) =>
(key, toResult(response))
}
)
)
}
case MaxAggregationResponse(value) =>
MaxAggregationResult(value)
case MinAggregationResponse(value) =>
Expand Down Expand Up @@ -185,7 +188,7 @@ private[elasticsearch] object FilterAggregationResponse {
case str if str.contains("cardinality#") =>
(field.split("#")(1), data.asInstanceOf[CardinalityAggregationResponse])
case str if str.contains("filter#") =>
(field.split("#")(1), data.asInstanceOf[PercentilesAggregationResponse])
(field.split("#")(1), data.asInstanceOf[FilterAggregationResponse])
case str if str.contains("max#") =>
(field.split("#")(1), data.asInstanceOf[MaxAggregationResponse])
case str if str.contains("min#") =>
Expand Down Expand Up @@ -213,6 +216,42 @@ private[elasticsearch] object FilterAggregationResponse {

}

private[elasticsearch] final case class MaxAggregationResponse(value: Double) extends AggregationResponse

private[elasticsearch] object MaxAggregationResponse {
implicit val decoder: JsonDecoder[MaxAggregationResponse] = DeriveJsonDecoder.gen[MaxAggregationResponse]
}

private[elasticsearch] final case class MinAggregationResponse(value: Double) extends AggregationResponse

private[elasticsearch] object MinAggregationResponse {
implicit val decoder: JsonDecoder[MinAggregationResponse] = DeriveJsonDecoder.gen[MinAggregationResponse]
}

private[elasticsearch] final case class MissingAggregationResponse(@jsonField("doc_count") docCount: Int)
extends AggregationResponse

private[elasticsearch] object MissingAggregationResponse {
implicit val decoder: JsonDecoder[MissingAggregationResponse] = DeriveJsonDecoder.gen[MissingAggregationResponse]
}

private[elasticsearch] final case class PercentilesAggregationResponse(values: Map[String, Double])
extends AggregationResponse

private[elasticsearch] object PercentilesAggregationResponse {
implicit val decoder: JsonDecoder[PercentilesAggregationResponse] =
DeriveJsonDecoder.gen[PercentilesAggregationResponse]
}

private[elasticsearch] final case class SumAggregationResponse(value: Double) extends AggregationResponse

private[elasticsearch] object SumAggregationResponse {
implicit val decoder: JsonDecoder[SumAggregationResponse] = DeriveJsonDecoder.gen[SumAggregationResponse]

}

private[elasticsearch] sealed trait AggregationBucket

private[elasticsearch] final case class TermsAggregationResponse(
@jsonField("doc_count_error_upper_bound")
docErrorCount: Int,
Expand Down Expand Up @@ -249,8 +288,7 @@ private[elasticsearch] object TermsAggregationBucket {
case str if str.contains("cardinality#") =>
Some(field -> CardinalityAggregationResponse(value = objFields("value").unsafeAs[Int]))
case str if str.contains("filter#") =>
Some(field -> "")
// Some(field -> data.unsafeAs[FilterAggregationResponse](FilterAggregationBucket.decoder))
Some(field -> data.unsafeAs[FilterAggregationResponse](FilterAggregationResponse.decoder))
case str if str.contains("max#") =>
Some(field -> MaxAggregationResponse(value = objFields("value").unsafeAs[Double]))
case str if str.contains("min#") =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,32 @@ package zio.elasticsearch.result

import zio.Chunk

import scala.util.{Failure, Success, Try}

sealed trait AggregationResult

final case class AvgAggregationResult private[elasticsearch] (value: Double) extends AggregationResult

final case class CardinalityAggregationResult private[elasticsearch] (value: Int) extends AggregationResult

final case class FilterAggregationResult private[elasticsearch] (
docCount: Int,
subAggregations: Map[String, AggregationResult]
) extends AggregationResult {

def subAggregationAs[A <: AggregationResult](aggName: String): Either[DecodingException, Option[A]] =
subAggregations.get(aggName) match {
case Some(aggRes) =>
aggRes match {
case agg: A =>
Right(Some(agg))
case _ =>
Left(DecodingException(s"Aggregation with name $aggName was not of type you provided."))
}
case None =>
Right(None)
}

}

final case class MaxAggregationResult private[elasticsearch] (value: Double) extends AggregationResult

final case class MinAggregationResult private[elasticsearch] (value: Double) extends AggregationResult
Expand Down Expand Up @@ -79,9 +97,11 @@ final case class FilterAggregationResult private[elasticsearch] (
def subAggregationAs[A <: AggregationResult](aggName: String): Either[DecodingException, Option[A]] =
subAggregations.get(aggName) match {
case Some(aggRes) =>
Try(aggRes.asInstanceOf[A]) match {
case Failure(_) => Left(DecodingException(s"Aggregation with name $aggName was not of type you provided."))
case Success(agg) => Right(Some(agg))
aggRes match {
case agg: A =>
Right(Some(agg))
case _ =>
Left(DecodingException(s"Aggregation with name $aggName was not of type you provided."))
}
case None =>
Right(None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,13 @@ object ElasticAggregationSpec extends ZIOSpecDefault {
)
},
test("filter") {
val query = term(TestDocument.stringField, "test")
val aggregation = filterAggregation("aggregation", query)
val aggregationTs = filterAggregation("aggregation", query)
val aggregationTsRaw = filterAggregation("aggregation", query)
val query = term(TestDocument.stringField, "test")
val aggregation = filterAggregation("aggregation", query)
val aggregationWithSubAggregation =
filterAggregation("aggregation", query).withSubAgg(minAggregation("subAggregation", TestDocument.intField))
val aggregationWithMultipleSubAggregations = filterAggregation("aggregation", query)
.withSubAgg(minAggregation("minSubAggregation", TestDocument.intField))
.withSubAgg(maxAggregation("maxSubAggregation", TestDocument.intField))

assert(aggregation)(
equalTo(
Expand All @@ -144,20 +147,23 @@ object ElasticAggregationSpec extends ZIOSpecDefault {
)
)
) &&
assert(aggregationTs)(
assert(aggregationWithSubAggregation)(
equalTo(
Filter(
name = "aggregation",
subAggregations = Chunk.empty,
subAggregations = Chunk(minAggregation("subAggregation", TestDocument.intField)),
query = query
)
)
) &&
assert(aggregationTsRaw)(
assert(aggregationWithMultipleSubAggregations)(
equalTo(
Filter(
name = "aggregation",
subAggregations = Chunk.empty,
subAggregations = Chunk(
maxAggregation("maxSubAggregation", TestDocument.intField),
minAggregation("minSubAggregation", TestDocument.intField)
),
query = query
)
)
Expand Down Expand Up @@ -683,12 +689,13 @@ object ElasticAggregationSpec extends ZIOSpecDefault {
assert(aggregationWithMissing.toJson)(equalTo(expectedWithMissing.toJson))
},
test("filter") {
val query = term(TestDocument.stringField, "test")
val aggregation = filterAggregation("aggregation", query)
val aggregationTs = filterAggregation("aggregation", query)
val aggregationWithSubAggregation = filterAggregation("aggregation", query).withSubAgg(
minAggregation("subAggregation", TestDocument.intField)
)
val query = term(TestDocument.stringField, "test")
val aggregation = filterAggregation("aggregation", query)
val aggregationWithSubAggregation =
filterAggregation("aggregation", query).withSubAgg(minAggregation("subAggregation", TestDocument.intField))
val aggregationWithMultipleSubAggregations = filterAggregation("aggregation", query)
.withSubAgg(minAggregation("minSubAggregation", TestDocument.intField))
.withSubAgg(maxAggregation("maxSubAggregation", TestDocument.intField))

val expected =
"""
Expand All @@ -705,7 +712,7 @@ object ElasticAggregationSpec extends ZIOSpecDefault {
|}
|""".stripMargin

val expectedTs =
val expectedWithSubAggregation =
"""
|{
| "aggregation": {
Expand All @@ -715,12 +722,19 @@ object ElasticAggregationSpec extends ZIOSpecDefault {
| "value": "test"
| }
| }
| }
| },
| "aggs": {
| "subAggregation": {
| "min": {
| "field": "intField"
| }
| }
| }
| }
|}
|""".stripMargin

val expectedWithSubAggregation =
val expectedWithMultipleSubAggregations =
"""
|{
| "aggregation": {
Expand All @@ -732,7 +746,12 @@ object ElasticAggregationSpec extends ZIOSpecDefault {
| }
| },
| "aggs": {
| "subAggregation": {
| "maxSubAggregation": {
| "max": {
| "field": "intField"
| }
| },
| "minSubAggregation": {
| "min": {
| "field": "intField"
| }
Expand All @@ -743,8 +762,8 @@ object ElasticAggregationSpec extends ZIOSpecDefault {
|""".stripMargin

assert(aggregation.toJson)(equalTo(expected.toJson)) &&
assert(aggregationTs.toJson)(equalTo(expectedTs.toJson)) &&
assert(aggregationWithSubAggregation.toJson)(equalTo(expectedWithSubAggregation.toJson))
assert(aggregationWithSubAggregation.toJson)(equalTo(expectedWithSubAggregation.toJson)) &&
assert(aggregationWithMultipleSubAggregations.toJson)(equalTo(expectedWithMultipleSubAggregations.toJson))
},
test("max") {
val aggregation = maxAggregation("aggregation", "testField")
Expand Down
1 change: 1 addition & 0 deletions website/sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ module.exports = {
'overview/aggregations/elastic_aggregation_bucket_selector',
'overview/aggregations/elastic_aggregation_bucket_sort',
'overview/aggregations/elastic_aggregation_cardinality',
'overview/aggregations/elastic_aggregation_filter',
'overview/aggregations/elastic_aggregation_max',
'overview/aggregations/elastic_aggregation_min',
'overview/aggregations/elastic_aggregation_missing',
Expand Down

0 comments on commit 0627b78

Please sign in to comment.