Skip to content

Commit

Permalink
Support BucketSelector aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
dbulaja98 committed May 11, 2023
1 parent ebdade8 commit ab3fb71
Show file tree
Hide file tree
Showing 10 changed files with 167 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@ import zio.elasticsearch.ElasticQuery._
import zio.elasticsearch.ElasticSort.sortBy
import zio.elasticsearch.domain.{PartialTestDocument, TestDocument, TestSubDocument}
import zio.elasticsearch.executor.Executor
import zio.elasticsearch.executor.response.{CardinalityAggregationResponse, MaxAggregationResponse}
import zio.elasticsearch.query.DistanceUnit.Kilometers
import zio.elasticsearch.executor.response.{
CardinalityAggregationResponse,
MaxAggregationResponse,
TermsAggregationResponse
}
import zio.elasticsearch.query.sort.SortMode.Max
import zio.elasticsearch.query.sort.SortOrder._
import zio.elasticsearch.query.sort.SourceType.NumberType
Expand Down Expand Up @@ -307,27 +311,36 @@ object HttpExecutorSpec extends IntegrationSpec {
Executor.execute(ElasticRequest.createIndex(firstSearchIndex)),
Executor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
),
test("search using match all query with terms aggregations with nested terms aggregation") {
test(
"search using match all query with terms aggregations with nested max aggregation and bucketSelector aggregation"
) {
checkOnce(genDocumentId, genTestDocument, genDocumentId, genTestDocument) {
(firstDocumentId, firstDocument, secondDocumentId, secondDocument) =>
for {
_ <- Executor.execute(ElasticRequest.deleteByQuery(firstSearchIndex, matchAll))
_ <- Executor.execute(
ElasticRequest.upsert[TestDocument](firstSearchIndex, firstDocumentId, firstDocument)
)
_ <- Executor.execute(
ElasticRequest
.upsert[TestDocument](firstSearchIndex, secondDocumentId, secondDocument)
.refreshTrue
.upsert[TestDocument](firstSearchIndex, firstDocumentId, firstDocument.copy(intField = 5))
)
_ <-
Executor.execute(
ElasticRequest
.upsert[TestDocument](firstSearchIndex, secondDocumentId, secondDocument.copy(intField = 100))
.refreshTrue
)
query = matchAll
aggregation =
termsAggregation(
name = "aggregationString",
field = TestDocument.stringField.keyword
).withSubAgg(
termsAggregation(name = "aggregationInt", field = "intField")
)
).withSubAgg(maxAggregation(name = "aggregationInt", field = TestDocument.intField))
.withSubAgg(
bucketSelectorAggregation(
name = "aggregationSelector",
script = Script("params.aggregation_int > 10"),
bucketsPath = Map("aggregation_int" -> "aggregationInt")
)
)
res <- Executor.execute(
ElasticRequest
.search(
Expand All @@ -338,7 +351,9 @@ object HttpExecutorSpec extends IntegrationSpec {
)
docs <- res.documentAs[TestDocument]
aggs <- res.aggregations
} yield assert(docs)(isNonEmpty) && assert(aggs)(isNonEmpty)
} yield assert(docs)(isNonEmpty) && assert(
aggs("aggregationString").asInstanceOf[TermsAggregationResponse].buckets.size
)(equalTo(1))
}
} @@ around(
Executor.execute(ElasticRequest.createIndex(firstSearchIndex)),
Expand Down Expand Up @@ -1487,7 +1502,7 @@ object HttpExecutorSpec extends IntegrationSpec {
req6 = ElasticRequest.updateByScript(
index,
firstDocumentId,
Script("ctx._source.intField = params['factor']").withParams("factor" -> 100)
Script("ctx._source.intField = params['factor']").params("factor" -> 100)
)
req7 =
ElasticRequest
Expand Down Expand Up @@ -1521,7 +1536,7 @@ object HttpExecutorSpec extends IntegrationSpec {
ElasticRequest.updateByScript(
index,
documentId,
Script("ctx._source.intField += params['factor']").withParams("factor" -> factor)
Script("ctx._source.intField += params['factor']").params("factor" -> factor)
)
)
doc <- Executor.execute(ElasticRequest.getById(index, documentId)).documentAs[TestDocument]
Expand All @@ -1536,7 +1551,7 @@ object HttpExecutorSpec extends IntegrationSpec {
.updateByScript(
index,
documentId,
Script("ctx._source.intField += params['factor']").withParams("factor" -> 2)
Script("ctx._source.intField += params['factor']").params("factor" -> 2)
)
.orCreate(document)
)
Expand Down Expand Up @@ -1567,7 +1582,7 @@ object HttpExecutorSpec extends IntegrationSpec {
ElasticRequest
.updateAllByQuery(
updateByQueryIndex,
Script("ctx._source['stringField'] = params['str']").withParams("str" -> stringField)
Script("ctx._source['stringField'] = params['str']").params("str" -> stringField)
)
.refreshTrue
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,31 @@ package zio.elasticsearch

import zio.Chunk
import zio.elasticsearch.aggregation._
import zio.elasticsearch.script.Script

object ElasticAggregation {

/**
* Constructs an instance of [[zio.elasticsearch.aggregation.BucketSelectorAggregation]] using the specified
* parameters.
*
* @param name
* aggregation name
* @param script
* The script to run for this aggregation. The script can be inline, file or indexed
* @param bucketsPath
* A map of script variables and their associated path to the buckets we wish to use for the variable
* @return
* an instance of [[zio.elasticsearch.aggregation.BucketSelectorAggregation]] that represents bucket selector
* aggregation to be performed.
*/
final def bucketSelectorAggregation(
name: String,
script: Script,
bucketsPath: Map[String, String]
): BucketSelectorAggregation =
BucketSelector(name = name, script = script, bucketsPath = bucketsPath)

/**
* Constructs a type-safe instance of [[zio.elasticsearch.aggregation.CardinalityAggregation]] using the specified
* parameters. It calculates an approximate count of distinct values.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import zio.Chunk
import zio.elasticsearch.ElasticAggregation.multipleAggregations
import zio.elasticsearch.ElasticPrimitive.ElasticPrimitiveOps
import zio.elasticsearch.aggregation.options._
import zio.elasticsearch.script.Script
import zio.json.ast.Json
import zio.json.ast.Json.{Arr, Obj}

Expand All @@ -32,6 +33,23 @@ sealed trait ElasticAggregation { self =>

sealed trait SingleElasticAggregation extends ElasticAggregation

sealed trait BucketSelectorAggregation extends SingleElasticAggregation with WithAgg

private[elasticsearch] final case class BucketSelector(name: String, script: Script, bucketsPath: Map[String, String])
extends BucketSelectorAggregation { self =>

def withAgg(agg: SingleElasticAggregation): MultipleAggregations =
multipleAggregations.aggregations(self, agg)

private[elasticsearch] def paramsToJson: Json = {
val bucketsPathJson: Json = Obj("buckets_path" -> bucketsPath.collect { case (scriptVal, path) =>
Obj(scriptVal -> path.toJson)
}.reduce(_ merge _))

Obj(name -> Obj("bucket_selector" -> (bucketsPathJson merge Obj("script" -> script.toJson))))
}
}

sealed trait CardinalityAggregation
extends SingleElasticAggregation
with HasMissing[CardinalityAggregation]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ private[elasticsearch] final case class Script(
def lang(value: String): Script =
self.copy(lang = Some(value))

def withParams(values: (String, Any)*): Script =
def params(values: (String, Any)*): Script =
self.copy(params = params ++ values.toList)

def toJson: Json =
private[elasticsearch] def toJson: Json =
Obj(
List(
self.lang.map(lang => "lang" -> lang.toJson),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ private[elasticsearch] trait HasParams[S <: HasParams[S]] {
* @return
* an instance of the [[zio.elasticsearch.script.Script]] enriched with the `params` parameter.
*/
def withParams(values: (String, Any)*): S
def params(values: (String, Any)*): S
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import zio.elasticsearch.aggregation._
import zio.elasticsearch.domain.{TestDocument, TestSubDocument}
import zio.elasticsearch.query.sort.SortOrder
import zio.elasticsearch.query.sort.SortOrder.{Asc, Desc}
import zio.elasticsearch.script.Script
import zio.elasticsearch.utils._
import zio.test.Assertion.equalTo
import zio.test._
Expand All @@ -14,6 +15,37 @@ object ElasticAggregationSpec extends ZIOSpecDefault {
def spec: Spec[TestEnvironment, Any] =
suite("ElasticAggregation")(
suite("constructing")(
test("bucketSelector") {
val aggregation1 = bucketSelectorAggregation(
name = "aggregation",
script = Script("params.agg1 > 10"),
bucketsPath = Map("agg1" -> "aggregation1")
)
val aggregation2 = bucketSelectorAggregation(
name = "aggregation",
script = Script("params.agg1 + params.agg2 > 10"),
bucketsPath = Map("agg1" -> "aggregation1", "agg2" -> "aggregation2")
)

assert(aggregation1)(
equalTo(
BucketSelector(
name = "aggregation",
script = Script("params.agg1 > 10"),
bucketsPath = Map("agg1" -> "aggregation1")
)
)
) &&
assert(aggregation2)(
equalTo(
BucketSelector(
name = "aggregation",
script = Script("params.agg1 + params.agg2 > 10"),
bucketsPath = Map("agg1" -> "aggregation1", "agg2" -> "aggregation2")
)
)
)
},
test("cardinality") {
val aggregation = cardinalityAggregation("aggregation", "testField")
val aggregationTs = cardinalityAggregation("aggregation", TestSubDocument.intField)
Expand Down Expand Up @@ -237,6 +269,58 @@ object ElasticAggregationSpec extends ZIOSpecDefault {
}
),
suite("encoding as JSON")(
test("bucketSelector") {
val aggregation1 = bucketSelectorAggregation(
name = "aggregation",
script = Script("params.agg1 > 10"),
bucketsPath = Map("agg1" -> "aggregation1")
)
val aggregation2 = bucketSelectorAggregation(
name = "aggregation",
script = Script("params.agg1 + params.agg2 > 10"),
bucketsPath = Map("agg1" -> "aggregation1", "agg2" -> "aggregation2")
)

val expected1 =
"""
|{
| "aggs": {
| "aggregation": {
| "bucket_selector": {
| "buckets_path": {
| "agg1": "aggregation1"
| },
| "script": {
| "source": "params.agg1 > 10"
| }
| }
| }
| }
|}
|""".stripMargin

val expected2 =
"""
|{
| "aggs": {
| "aggregation": {
| "bucket_selector": {
| "buckets_path": {
| "agg1": "aggregation1",
| "agg2": "aggregation2"
| },
| "script": {
| "source": "params.agg1 + params.agg2 > 10"
| }
| }
| }
| }
|}
|""".stripMargin

assert(aggregation1.toJson)(equalTo(expected1.toJson)) &&
assert(aggregation2.toJson)(equalTo(expected2.toJson))
},
test("cardinality") {
val aggregation = cardinalityAggregation("aggregation", "testField")
val aggregationTs = cardinalityAggregation("aggregation", TestDocument.intField)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ object ElasticRequestDSLSpec extends ZIOSpecDefault {
val jsonRequest = updateByScript(
index = Index,
id = DocId,
script = Script("ctx._source.intField += params['factor']").withParams("factor" -> 2)
script = Script("ctx._source.intField += params['factor']").params("factor" -> 2)
).orCreate[TestDocument](
TestDocument(
stringField = "stringField",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ object HttpElasticExecutorSpec extends SttpBackendStubSpec {
.updateByScript(
index = index,
id = DocumentId("V4x8q4UB3agN0z75fv5r"),
script = Script("ctx._source.intField += params['factor']").withParams("factor" -> 2)
script = Script("ctx._source.intField += params['factor']").params("factor" -> 2)
)
.orCreate(doc = secondDoc)
.routing(Routing("routing"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ object ScriptSpec extends ZIOSpecDefault {
assert(Script("doc['day_of_week'].value"))(equalTo(Script("doc['day_of_week'].value", Map.empty, None)))
},
test("successfully create Script with source and params") {
assert(Script("doc['day_of_week'].value * params['factor']").withParams("factor" -> 2))(
assert(Script("doc['day_of_week'].value * params['factor']").params("factor" -> 2))(
equalTo(Script("doc['day_of_week'].value * params['factor']", Map("factor" -> 2), None))
)
},
Expand All @@ -24,7 +24,7 @@ object ScriptSpec extends ZIOSpecDefault {
)
},
test("successfully create Script with source, params and lang") {
assert(Script("doc['day_of_week'].value * params['factor']").withParams("factor" -> 2).lang("painless"))(
assert(Script("doc['day_of_week'].value * params['factor']").params("factor" -> 2).lang("painless"))(
equalTo(Script("doc['day_of_week'].value * params['factor']", Map("factor" -> 2), Some("painless")))
)
}
Expand All @@ -44,7 +44,7 @@ object ScriptSpec extends ZIOSpecDefault {
),
suite("encoding Script as JSON")(
test("properly encode Script with source and params") {
val script = Script("doc['day_of_week'].value * params['factor']").withParams("factor" -> 2)
val script = Script("doc['day_of_week'].value * params['factor']").params("factor" -> 2)
val expected =
"""
|{
Expand All @@ -70,7 +70,7 @@ object ScriptSpec extends ZIOSpecDefault {
assert(script.toJson)(equalTo(expected.toJson))
},
test("properly encode Script with source, params and lang") {
val script = Script("doc['day_of_week'].value * params['factor']").withParams("factor" -> 2).lang("painless")
val script = Script("doc['day_of_week'].value * params['factor']").params("factor" -> 2).lang("painless")
val expected =
"""
|{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ object SortSpec extends ZIOSpecDefault {
},
test("successfully create SortByScript with given `mode`") {
assert(
sortBy(Script(source = "doc['day_of_week'].value * params['factor']").withParams("factor" -> 2), NumberType)
sortBy(Script(source = "doc['day_of_week'].value * params['factor']").params("factor" -> 2), NumberType)
.mode(Avg)
)(
equalTo(
Expand Down Expand Up @@ -209,7 +209,7 @@ object SortSpec extends ZIOSpecDefault {
test("successfully create SortByScript with given `mode` and `order`") {
assert(
sortBy(
Script(source = "doc['day_of_week'].value * params['factor']").withParams("factor" -> 2).lang("painless"),
Script(source = "doc['day_of_week'].value * params['factor']").params("factor" -> 2).lang("painless"),
NumberType
)
.mode(Avg)
Expand Down Expand Up @@ -414,7 +414,7 @@ object SortSpec extends ZIOSpecDefault {
},
test("properly encode SortByScript with given `mode`") {
val sort = sortBy(
script = Script("doc['day_of_week'].value * params['factor']").withParams("factor" -> 2),
script = Script("doc['day_of_week'].value * params['factor']").params("factor" -> 2),
sourceType = NumberType
)
.mode(Avg)
Expand Down Expand Up @@ -465,7 +465,7 @@ object SortSpec extends ZIOSpecDefault {
},
test("properly encode SortByScript with `mode` and `order`") {
val sort = sortBy(
Script(source = "doc['day_of_week'].value * params['factor']").withParams("factor" -> 2).lang("painless"),
Script(source = "doc['day_of_week'].value * params['factor']").params("factor" -> 2).lang("painless"),
NumberType
)
.mode(Avg)
Expand Down

0 comments on commit ab3fb71

Please sign in to comment.